diff options
Diffstat (limited to 'src/core/channel')
23 files changed, 3634 insertions, 0 deletions
diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c new file mode 100644 index 0000000000..4a98cbfbbb --- /dev/null +++ b/src/core/channel/call_op_string.c @@ -0,0 +1,155 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/channel_stack.h" + +#include <stdarg.h> +#include <stdio.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/string.h> +#include <grpc/support/useful.h> + +#define MAX_APPEND 1024 + +typedef struct { + size_t cap; + size_t len; + char *buffer; +} buf; + +static void bprintf(buf *b, const char *fmt, ...) { + va_list arg; + if (b->len + MAX_APPEND > b->cap) { + b->cap = GPR_MAX(b->len + MAX_APPEND, b->cap * 3 / 2); + b->buffer = gpr_realloc(b->buffer, b->cap); + } + va_start(arg, fmt); + b->len += vsprintf(b->buffer + b->len, fmt, arg); + va_end(arg); +} + +static void bputs(buf *b, const char *s) { + size_t slen = strlen(s); + if (b->len + slen + 1 > b->cap) { + b->cap = GPR_MAX(b->len + slen + 1, b->cap * 3 / 2); + b->buffer = gpr_realloc(b->buffer, b->cap); + } + strcat(b->buffer, s); + b->len += slen; +} + +static void put_metadata(buf *b, grpc_mdelem *md) { + char *txt; + + txt = gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice), + GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT); + bputs(b, " key="); + bputs(b, txt); + gpr_free(txt); + + txt = gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice), + GPR_SLICE_LENGTH(md->value->slice), GPR_HEXDUMP_PLAINTEXT); + bputs(b, " value="); + bputs(b, txt); + gpr_free(txt); +} + +char *grpc_call_op_string(grpc_call_op *op) { + buf b = {0, 0, 0}; + + switch (op->dir) { + case GRPC_CALL_DOWN: + bprintf(&b, ">"); + break; + case GRPC_CALL_UP: + bprintf(&b, "<"); + break; + } + switch (op->type) { + case GRPC_SEND_METADATA: + bprintf(&b, "SEND_METADATA"); + put_metadata(&b, op->data.metadata); + break; + case GRPC_SEND_DEADLINE: + bprintf(&b, "SEND_DEADLINE %d.%09d", op->data.deadline.tv_sec, + op->data.deadline.tv_nsec); + break; + case GRPC_SEND_START: + bprintf(&b, "SEND_START"); + break; + case GRPC_SEND_MESSAGE: + bprintf(&b, "SEND_MESSAGE"); + break; + case GRPC_SEND_FINISH: + bprintf(&b, "SEND_FINISH"); + break; + case GRPC_REQUEST_DATA: + bprintf(&b, "REQUEST_DATA"); + break; + case GRPC_RECV_METADATA: + bprintf(&b, "RECV_METADATA"); + put_metadata(&b, op->data.metadata); + break; + case GRPC_RECV_DEADLINE: + bprintf(&b, "RECV_DEADLINE %d.%09d", op->data.deadline.tv_sec, + op->data.deadline.tv_nsec); + break; + case GRPC_RECV_END_OF_INITIAL_METADATA: + bprintf(&b, "RECV_END_OF_INITIAL_METADATA"); + break; + case GRPC_RECV_MESSAGE: + bprintf(&b, "RECV_MESSAGE"); + break; + case GRPC_RECV_HALF_CLOSE: + bprintf(&b, "RECV_HALF_CLOSE"); + break; + case GRPC_RECV_FINISH: + bprintf(&b, "RECV_FINISH"); + break; + case GRPC_CANCEL_OP: + bprintf(&b, "CANCEL_OP"); + break; + } + bprintf(&b, " flags=0x%08x", op->flags); + + return b.buffer; +} + +void grpc_call_log_op(char *file, int line, gpr_log_severity severity, + grpc_call_element *elem, grpc_call_op *op) { + char *str = grpc_call_op_string(op); + gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str); + gpr_free(str); +} diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c new file mode 100644 index 0000000000..4285b284ce --- /dev/null +++ b/src/core/channel/census_filter.c @@ -0,0 +1,189 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/census_filter.h" + +#include <stdio.h> +#include <string.h> + +#include "src/core/channel/channel_stack.h" +#include "src/core/channel/noop_filter.h" +#include "src/core/statistics/census_interface.h" +#include "src/core/statistics/census_rpc_stats.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice.h> +#include <grpc/support/time.h> + +typedef struct call_data { + census_op_id op_id; + census_rpc_stats stats; + gpr_timespec start_ts; +} call_data; + +typedef struct channel_data { + grpc_mdstr* path_str; /* pointer to meta data str with key == ":path" */ +} channel_data; + +static void init_rpc_stats(census_rpc_stats* stats) { + memset(stats, 0, sizeof(census_rpc_stats)); + stats->cnt = 1; +} + +static double gpr_timespec_to_micros(gpr_timespec t) { + return t.tv_sec * GPR_US_PER_SEC + t.tv_nsec * 1e-3; +} + +static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld, + channel_data* chand) { + if (op->data.metadata->key == chand->path_str) { + census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR( + op->data.metadata->value->slice)); + } +} + +static void client_call_op(grpc_call_element* elem, grpc_call_op* op) { + call_data* calld = elem->call_data; + channel_data* chand = elem->channel_data; + GPR_ASSERT(calld != NULL); + GPR_ASSERT(chand != NULL); + GPR_ASSERT((calld->op_id.upper != 0) && (calld->op_id.lower != 0)); + switch (op->type) { + case GRPC_SEND_METADATA: + extract_and_annotate_method_tag(op, calld, chand); + break; + case GRPC_RECV_FINISH: + /* Should we stop timing the rpc here? */ + break; + default: + break; + } + /* Always pass control up or down the stack depending on op->dir */ + grpc_call_next_op(elem, op); +} + +static void server_call_op(grpc_call_element* elem, grpc_call_op* op) { + call_data* calld = elem->call_data; + channel_data* chand = elem->channel_data; + GPR_ASSERT(calld != NULL); + GPR_ASSERT(chand != NULL); + GPR_ASSERT((calld->op_id.upper != 0) && (calld->op_id.lower != 0)); + switch (op->type) { + case GRPC_RECV_METADATA: + extract_and_annotate_method_tag(op, calld, chand); + break; + case GRPC_SEND_FINISH: + /* Should we stop timing the rpc here? */ + break; + default: + break; + } + /* Always pass control up or down the stack depending on op->dir */ + grpc_call_next_op(elem, op); +} + +static void channel_op(grpc_channel_element* elem, grpc_channel_op* op) { + switch (op->type) { + case GRPC_TRANSPORT_CLOSED: + /* TODO(hongyu): Annotate trace information for all calls of the channel + */ + break; + default: + break; + } + grpc_channel_next_op(elem, op); +} + +static void client_init_call_elem(grpc_call_element* elem, + const void* server_transport_data) { + call_data* d = elem->call_data; + GPR_ASSERT(d != NULL); + init_rpc_stats(&d->stats); + d->start_ts = gpr_now(); + d->op_id = census_tracing_start_op(); +} + +static void client_destroy_call_elem(grpc_call_element* elem) { + call_data* d = elem->call_data; + GPR_ASSERT(d != NULL); + census_record_rpc_client_stats(d->op_id, &d->stats); + census_tracing_end_op(d->op_id); +} + +static void server_init_call_elem(grpc_call_element* elem, + const void* server_transport_data) { + call_data* d = elem->call_data; + GPR_ASSERT(d != NULL); + init_rpc_stats(&d->stats); + d->start_ts = gpr_now(); + d->op_id = census_tracing_start_op(); +} + +static void server_destroy_call_elem(grpc_call_element* elem) { + call_data* d = elem->call_data; + GPR_ASSERT(d != NULL); + d->stats.elapsed_time_ms = + gpr_timespec_to_micros(gpr_time_sub(gpr_now(), d->start_ts)); + census_record_rpc_server_stats(d->op_id, &d->stats); + census_tracing_end_op(d->op_id); +} + +static void init_channel_elem(grpc_channel_element* elem, + const grpc_channel_args* args, grpc_mdctx* mdctx, + int is_first, int is_last) { + channel_data* chand = elem->channel_data; + GPR_ASSERT(chand != NULL); + GPR_ASSERT(!is_first); + GPR_ASSERT(!is_last); + chand->path_str = grpc_mdstr_from_string(mdctx, ":path"); +} + +static void destroy_channel_elem(grpc_channel_element* elem) {} + +const grpc_channel_filter grpc_client_census_filter = { + client_call_op, channel_op, + + sizeof(call_data), client_init_call_elem, client_destroy_call_elem, + + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + + "census-client"}; + +const grpc_channel_filter grpc_server_census_filter = { + server_call_op, channel_op, + + sizeof(call_data), server_init_call_elem, server_destroy_call_elem, + + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + + "census-server"}; diff --git a/src/core/channel/census_filter.h b/src/core/channel/census_filter.h new file mode 100644 index 0000000000..5b2c01ca9b --- /dev/null +++ b/src/core/channel/census_filter.h @@ -0,0 +1,44 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_CHANNEL_CENSUS_FILTER_H__ +#define __GRPC_INTERNAL_CHANNEL_CENSUS_FILTER_H__ + +#include "src/core/channel/channel_stack.h" + +/* Census filters: provides tracing and stats collection functionalities. It + needs to reside right below the surface filter in the channel stack. */ +extern const grpc_channel_filter grpc_client_census_filter; +extern const grpc_channel_filter grpc_server_census_filter; + +#endif /* __GRPC_INTERNAL_CHANNEL_CENSUS_FILTER_H__ */ diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c new file mode 100644 index 0000000000..36312e54de --- /dev/null +++ b/src/core/channel/channel_args.c @@ -0,0 +1,112 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/grpc.h> +#include "src/core/channel/channel_args.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/string.h> + +#include <string.h> + +static grpc_arg copy_arg(const grpc_arg *src) { + grpc_arg dst; + dst.type = src->type; + dst.key = gpr_strdup(src->key); + switch (dst.type) { + case GRPC_ARG_STRING: + dst.value.string = gpr_strdup(src->value.string); + break; + case GRPC_ARG_INTEGER: + dst.value.integer = src->value.integer; + break; + case GRPC_ARG_POINTER: + dst.value.pointer = src->value.pointer; + dst.value.pointer.p = src->value.pointer.copy(src->value.pointer.p); + break; + } + return dst; +} + +grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, + const grpc_arg *to_add) { + grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args)); + size_t i; + size_t src_num_args = (src == NULL) ? 0 : src->num_args; + if (!src && !to_add) { + dst->num_args = 0; + dst->args = NULL; + return dst; + } + dst->num_args = src_num_args + ((to_add == NULL) ? 0 : 1); + dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args); + for (i = 0; i < src_num_args; i++) { + dst->args[i] = copy_arg(&src->args[i]); + } + if (to_add != NULL) dst->args[src_num_args] = copy_arg(to_add); + return dst; +} + +grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src) { + return grpc_channel_args_copy_and_add(src, NULL); +} + +void grpc_channel_args_destroy(grpc_channel_args *a) { + size_t i; + for (i = 0; i < a->num_args; i++) { + switch (a->args[i].type) { + case GRPC_ARG_STRING: + gpr_free(a->args[i].value.string); + break; + case GRPC_ARG_INTEGER: + break; + case GRPC_ARG_POINTER: + a->args[i].value.pointer.destroy(a->args[i].value.pointer.p); + break; + } + gpr_free(a->args[i].key); + } + gpr_free(a->args); + gpr_free(a); +} + +int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) { + int i; + if (a == NULL) return 0; + for (i = 0; i < a->num_args; i++) { + if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) { + return a->args[i].value.integer != 0; + } + } + return 0; +} diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h new file mode 100644 index 0000000000..cf38d5d01f --- /dev/null +++ b/src/core/channel/channel_args.h @@ -0,0 +1,54 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_CHANNEL_CHANNEL_ARGS_H__ +#define __GRPC_INTERNAL_CHANNEL_CHANNEL_ARGS_H__ + +#include <grpc/grpc.h> + +/* Copy some arguments */ +grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src); + +/* Copy some arguments and add the to_add parameter in the end. + If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */ +grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, + const grpc_arg *to_add); + +/* Destroy arguments created by grpc_channel_args_copy */ +void grpc_channel_args_destroy(grpc_channel_args *a); + +/* Reads census_enabled settings from channel args. Returns 1 if census_enabled + is specified in channel args, otherwise returns 0. */ +int grpc_channel_args_is_census_enabled(const grpc_channel_args *a); + +#endif /* __GRPC_INTERNAL_CHANNEL_CHANNEL_ARGS_H__ */ diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c new file mode 100644 index 0000000000..a403db35c2 --- /dev/null +++ b/src/core/channel/channel_stack.c @@ -0,0 +1,223 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/channel_stack.h" +#include <grpc/support/log.h> + +#include <stdlib.h> + +/* Memory layouts. + + Channel stack is laid out as: { + grpc_channel_stack stk; + padding to GPR_MAX_ALIGNMENT + grpc_channel_element[stk.count]; + per-filter memory, aligned to GPR_MAX_ALIGNMENT + } + + Call stack is laid out as: { + grpc_call_stack stk; + padding to GPR_MAX_ALIGNMENT + grpc_call_element[stk.count]; + per-filter memory, aligned to GPR_MAX_ALIGNMENT + } */ + +/* Given a size, round up to the next multiple of sizeof(void*) */ +#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \ + (((x)+GPR_MAX_ALIGNMENT - 1) & ~(GPR_MAX_ALIGNMENT - 1)) + +size_t grpc_channel_stack_size(const grpc_channel_filter **filters, + size_t filter_count) { + /* always need the header, and size for the channel elements */ + size_t size = + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack)) + + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element)); + size_t i; + + GPR_ASSERT((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0 && + "GPR_MAX_ALIGNMENT must be a power of two"); + + /* add the size for each filter */ + for (i = 0; i < filter_count; i++) { + size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data); + } + + return size; +} + +#define CHANNEL_ELEMS_FROM_STACK(stk) \ + ((grpc_channel_element *)((char *)(stk) + ROUND_UP_TO_ALIGNMENT_SIZE( \ + sizeof(grpc_channel_stack)))) + +#define CALL_ELEMS_FROM_STACK(stk) \ + ((grpc_call_element *)((char *)(stk) + \ + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)))) + +grpc_channel_element *grpc_channel_stack_element( + grpc_channel_stack *channel_stack, size_t index) { + return CHANNEL_ELEMS_FROM_STACK(channel_stack) + index; +} + +grpc_channel_element *grpc_channel_stack_last_element( + grpc_channel_stack *channel_stack) { + return grpc_channel_stack_element(channel_stack, channel_stack->count - 1); +} + +grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack, + size_t index) { + return CALL_ELEMS_FROM_STACK(call_stack) + index; +} + +void grpc_channel_stack_init(const grpc_channel_filter **filters, + size_t filter_count, const grpc_channel_args *args, + grpc_mdctx *metadata_context, + grpc_channel_stack *stack) { + size_t call_size = + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element)); + grpc_channel_element *elems; + char *user_data; + size_t i; + + stack->count = filter_count; + elems = CHANNEL_ELEMS_FROM_STACK(stack); + user_data = + ((char *)elems) + + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element)); + + /* init per-filter data */ + for (i = 0; i < filter_count; i++) { + elems[i].filter = filters[i]; + elems[i].channel_data = user_data; + elems[i].filter->init_channel_elem(&elems[i], args, metadata_context, + i == 0, i == (filter_count - 1)); + user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data); + call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data); + } + + GPR_ASSERT(user_data - (char *)stack == + grpc_channel_stack_size(filters, filter_count)); + + stack->call_stack_size = call_size; +} + +void grpc_channel_stack_destroy(grpc_channel_stack *stack) { + grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(stack); + size_t count = stack->count; + size_t i; + + /* destroy per-filter data */ + for (i = 0; i < count; i++) { + channel_elems[i].filter->destroy_channel_elem(&channel_elems[i]); + } +} + +void grpc_call_stack_init(grpc_channel_stack *channel_stack, + const void *transport_server_data, + grpc_call_stack *call_stack) { + grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); + size_t count = channel_stack->count; + grpc_call_element *call_elems; + char *user_data; + size_t i; + + call_stack->count = count; + call_elems = CALL_ELEMS_FROM_STACK(call_stack); + user_data = ((char *)call_elems) + + ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); + + /* init per-filter data */ + for (i = 0; i < count; i++) { + call_elems[i].filter = channel_elems[i].filter; + call_elems[i].channel_data = channel_elems[i].channel_data; + call_elems[i].call_data = user_data; + call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data); + user_data += + ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); + } +} + +void grpc_call_stack_destroy(grpc_call_stack *stack) { + grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack); + size_t count = stack->count; + size_t i; + + /* destroy per-filter data */ + for (i = 0; i < count; i++) { + elems[i].filter->destroy_call_elem(&elems[i]); + } +} + +void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) { + grpc_call_element *next_elem = elem + op->dir; + next_elem->filter->call_op(next_elem, op); +} + +void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) { + grpc_channel_element *next_elem = elem + op->dir; + next_elem->filter->channel_op(next_elem, op); +} + +grpc_channel_stack *grpc_channel_stack_from_top_element( + grpc_channel_element *elem) { + return (grpc_channel_stack *)((char *)(elem) - + ROUND_UP_TO_ALIGNMENT_SIZE( + sizeof(grpc_channel_stack))); +} + +grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { + return (grpc_call_stack *)((char *)(elem) - ROUND_UP_TO_ALIGNMENT_SIZE( + sizeof(grpc_call_stack))); +} + +static void do_nothing(void *user_data, grpc_op_error error) {} + +void grpc_call_element_send_metadata(grpc_call_element *cur_elem, + grpc_mdelem *mdelem) { + grpc_call_op metadata_op; + metadata_op.type = GRPC_SEND_METADATA; + metadata_op.dir = GRPC_CALL_DOWN; + metadata_op.done_cb = do_nothing; + metadata_op.user_data = NULL; + metadata_op.data.metadata = grpc_mdelem_ref(mdelem); + grpc_call_next_op(cur_elem, &metadata_op); +} + +void grpc_call_element_send_cancel(grpc_call_element *cur_elem) { + grpc_call_op cancel_op; + cancel_op.type = GRPC_CANCEL_OP; + cancel_op.dir = GRPC_CALL_DOWN; + cancel_op.done_cb = do_nothing; + cancel_op.user_data = NULL; + grpc_call_next_op(cur_elem, &cancel_op); +} diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h new file mode 100644 index 0000000000..0ae1005e67 --- /dev/null +++ b/src/core/channel/channel_stack.h @@ -0,0 +1,288 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_CHANNEL_CHANNEL_STACK_H__ +#define __GRPC_INTERNAL_CHANNEL_CHANNEL_STACK_H__ + +/* A channel filter defines how operations on a channel are implemented. + Channel filters are chained together to create full channels, and if those + chains are linear, then channel stacks provide a mechanism to minimize + allocations for that chain. + Call stacks are created by channel stacks and represent the per-call data + for that stack. */ + +#include <stddef.h> + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include "src/core/transport/transport.h" + +/* #define GRPC_CHANNEL_STACK_TRACE 1 */ + +typedef struct grpc_channel_element grpc_channel_element; +typedef struct grpc_call_element grpc_call_element; + +/* Call operations - things that can be sent and received. + + Threading: + SEND, RECV, and CANCEL ops can be active on a call at the same time, but + only one SEND, one RECV, and one CANCEL can be active at a time. + + If state is shared between send/receive/cancel operations, it is up to + filters to provide their own protection around that. */ +typedef enum { + /* send metadata to the channels peer */ + GRPC_SEND_METADATA, + /* send a deadline */ + GRPC_SEND_DEADLINE, + /* start a connection (corresponds to start_invoke/accept) */ + GRPC_SEND_START, + /* send a message to the channels peer */ + GRPC_SEND_MESSAGE, + /* send half-close to the channels peer */ + GRPC_SEND_FINISH, + /* request that more data be allowed through flow control */ + GRPC_REQUEST_DATA, + /* metadata was received from the channels peer */ + GRPC_RECV_METADATA, + /* receive a deadline */ + GRPC_RECV_DEADLINE, + /* the end of the first batch of metadata was received */ + GRPC_RECV_END_OF_INITIAL_METADATA, + /* a message was received from the channels peer */ + GRPC_RECV_MESSAGE, + /* half-close was received from the channels peer */ + GRPC_RECV_HALF_CLOSE, + /* full close was received from the channels peer */ + GRPC_RECV_FINISH, + /* the call has been abnormally terminated */ + GRPC_CANCEL_OP +} grpc_call_op_type; + +/* The direction of the call. + The values of the enums (1, -1) matter here - they are used to increment + or decrement a pointer to find the next element to call */ +typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir; + +/* A single filterable operation to be performed on a call */ +typedef struct { + /* The type of operation we're performing */ + grpc_call_op_type type; + /* The directionality of this call - does the operation begin at the bottom + of the stack and flow up, or does the operation start at the top of the + stack and flow down through the filters. */ + grpc_call_dir dir; + + /* Flags associated with this call: see GRPC_WRITE_* in grpc.h */ + gpr_uint32 flags; + + /* Argument data, matching up with grpc_call_op_type names */ + union { + grpc_byte_buffer *message; + grpc_mdelem *metadata; + gpr_timespec deadline; + } data; + + /* Must be called when processing of this call-op is complete. + Signature chosen to match transport flow control callbacks */ + void (*done_cb)(void *user_data, grpc_op_error error); + /* User data to be passed into done_cb */ + void *user_data; +} grpc_call_op; + +/* returns a string representation of op, that can be destroyed with gpr_free */ +char *grpc_call_op_string(grpc_call_op *op); + +typedef enum { + GRPC_CHANNEL_SHUTDOWN, + GRPC_ACCEPT_CALL, + GRPC_TRANSPORT_CLOSED +} grpc_channel_op_type; + +/* A single filterable operation to be performed on a channel */ +typedef struct { + /* The type of operation we're performing */ + grpc_channel_op_type type; + /* The directionality of this call - is it bubbling up the stack, or down? */ + grpc_call_dir dir; + + /* Argument data, matching up with grpc_channel_op_type names */ + union { + struct { + grpc_transport *transport; + const void *transport_server_data; + } accept_call; + } data; +} grpc_channel_op; + +/* Channel filters specify: + 1. the amount of memory needed in the channel & call (via the sizeof_XXX + members) + 2. functions to initialize and destroy channel & call data + (init_XXX, destroy_XXX) + 3. functions to implement call operations and channel operations (call_op, + channel_op) + 4. a name, which is useful when debugging + + Members are laid out in approximate frequency of use order. */ +typedef struct { + /* Called to eg. send/receive data on a call. + See grpc_call_next_op on how to call the next element in the stack */ + void (*call_op)(grpc_call_element *elem, grpc_call_op *op); + /* Called to handle channel level operations - e.g. new calls, or transport + closure. + See grpc_channel_next_op on how to call the next element in the stack */ + void (*channel_op)(grpc_channel_element *elem, grpc_channel_op *op); + + /* sizeof(per call data) */ + size_t sizeof_call_data; + /* Initialize per call data. + elem is initialized at the start of the call, and elem->call_data is what + needs initializing. + The filter does not need to do any chaining. + server_transport_data is an opaque pointer. If it is NULL, this call is + on a client; if it is non-NULL, then it points to memory owned by the + transport and is on the server. Most filters want to ignore this + argument.*/ + void (*init_call_elem)(grpc_call_element *elem, + const void *server_transport_data); + /* Destroy per call data. + The filter does not need to do any chaining */ + void (*destroy_call_elem)(grpc_call_element *elem); + + /* sizeof(per channel data) */ + size_t sizeof_channel_data; + /* Initialize per-channel data. + elem is initialized at the start of the call, and elem->channel_data is + what needs initializing. + is_first, is_last designate this elements position in the stack, and are + useful for asserting correct configuration by upper layer code. + The filter does not need to do any chaining */ + void (*init_channel_elem)(grpc_channel_element *elem, + const grpc_channel_args *args, + grpc_mdctx *metadata_context, int is_first, + int is_last); + /* Destroy per channel data. + The filter does not need to do any chaining */ + void (*destroy_channel_elem)(grpc_channel_element *elem); + + /* The name of this filter */ + const char *name; +} grpc_channel_filter; + +/* A channel_element tracks its filter and the filter requested memory within + a channel allocation */ +struct grpc_channel_element { + const grpc_channel_filter *filter; + void *channel_data; +}; + +/* A call_element tracks its filter, the filter requested memory within + a channel allocation, and the filter requested memory within a call + allocation */ +struct grpc_call_element { + const grpc_channel_filter *filter; + void *channel_data; + void *call_data; +}; + +/* A channel stack tracks a set of related filters for one channel, and + guarantees they live within a single malloc() allocation */ +typedef struct { + size_t count; + /* Memory required for a call stack (computed at channel stack + initialization) */ + size_t call_stack_size; +} grpc_channel_stack; + +/* A call stack tracks a set of related filters for one call, and guarantees + they live within a single malloc() allocation */ +typedef struct { size_t count; } grpc_call_stack; + +/* Get a channel element given a channel stack and its index */ +grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack, + size_t i); +/* Get the last channel element in a channel stack */ +grpc_channel_element *grpc_channel_stack_last_element( + grpc_channel_stack *stack); +/* Get a call stack element given a call stack and an index */ +grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i); + +/* Determine memory required for a channel stack containing a set of filters */ +size_t grpc_channel_stack_size(const grpc_channel_filter **filters, + size_t filter_count); +/* Initialize a channel stack given some filters */ +void grpc_channel_stack_init(const grpc_channel_filter **filters, + size_t filter_count, const grpc_channel_args *args, + grpc_mdctx *metadata_context, + grpc_channel_stack *stack); +/* Destroy a channel stack */ +void grpc_channel_stack_destroy(grpc_channel_stack *stack); + +/* Initialize a call stack given a channel stack. transport_server_data is + expected to be NULL on a client, or an opaque transport owned pointer on the + server. */ +void grpc_call_stack_init(grpc_channel_stack *channel_stack, + const void *transport_server_data, + grpc_call_stack *call_stack); +/* Destroy a call stack */ +void grpc_call_stack_destroy(grpc_call_stack *stack); + +/* Call the next operation (depending on call directionality) in a call stack */ +void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op); +/* Call the next operation (depending on call directionality) in a channel + stack */ +void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op); + +/* Given the top element of a channel stack, get the channel stack itself */ +grpc_channel_stack *grpc_channel_stack_from_top_element( + grpc_channel_element *elem); +/* Given the top element of a call stack, get the call stack itself */ +grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem); + +void grpc_call_log_op(char *file, int line, gpr_log_severity severity, + grpc_call_element *elem, grpc_call_op *op); + +void grpc_call_element_send_metadata(grpc_call_element *cur_elem, + grpc_mdelem *elem); +void grpc_call_element_send_cancel(grpc_call_element *cur_elem); + +#ifdef GRPC_CHANNEL_STACK_TRACE +#define GRPC_CALL_LOG_OP(sev, elem, op) grpc_call_log_op(sev, elem, op) +#else +#define GRPC_CALL_LOG_OP(sev, elem, op) \ + do { \ + } while (0) +#endif + +#endif /* __GRPC_INTERNAL_CHANNEL_CHANNEL_STACK_H__ */ diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c new file mode 100644 index 0000000000..90563683d5 --- /dev/null +++ b/src/core/channel/client_channel.c @@ -0,0 +1,641 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/client_channel.h" + +#include <stdio.h> + +#include "src/core/channel/channel_args.h" +#include "src/core/channel/connected_channel.h" +#include "src/core/channel/metadata_buffer.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +/* Link back filter: passes up calls to the client channel, pushes down calls + down */ + +typedef struct { grpc_channel_element *back; } lb_channel_data; + +typedef struct { grpc_call_element *back; } lb_call_data; + +static void lb_call_op(grpc_call_element *elem, grpc_call_op *op) { + lb_call_data *calld = elem->call_data; + + switch (op->dir) { + case GRPC_CALL_UP: + calld->back->filter->call_op(calld->back, op); + break; + case GRPC_CALL_DOWN: + grpc_call_next_op(elem, op); + break; + } +} + +/* Currently we assume all channel operations should just be pushed up. */ +static void lb_channel_op(grpc_channel_element *elem, grpc_channel_op *op) { + lb_channel_data *chand = elem->channel_data; + + switch (op->dir) { + case GRPC_CALL_UP: + chand->back->filter->channel_op(chand->back, op); + break; + case GRPC_CALL_DOWN: + grpc_channel_next_op(elem, op); + break; + } +} + +/* Constructor for call_data */ +static void lb_init_call_elem(grpc_call_element *elem, + const void *server_transport_data) {} + +/* Destructor for call_data */ +static void lb_destroy_call_elem(grpc_call_element *elem) {} + +/* Constructor for channel_data */ +static void lb_init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, + grpc_mdctx *metadata_context, int is_first, + int is_last) { + GPR_ASSERT(is_first); + GPR_ASSERT(!is_last); +} + +/* Destructor for channel_data */ +static void lb_destroy_channel_elem(grpc_channel_element *elem) {} + +static const grpc_channel_filter link_back_filter = { + lb_call_op, lb_channel_op, + + sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem, + + sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem, + + "clientchannel.linkback", +}; + +/* Client channel implementation */ + +typedef struct { + size_t inflight_requests; + grpc_channel_stack *channel_stack; +} child_entry; + +typedef struct call_data call_data; + +typedef struct { + /* protects children, child_count, child_capacity, active_child, + transport_setup_initiated + does not protect channel stacks held by children + transport_setup is assumed to be set once during construction */ + gpr_mu mu; + + /* the sending child (points somewhere in children, or NULL) */ + child_entry *active_child; + /* vector of child channels */ + child_entry *children; + size_t child_count; + size_t child_capacity; + + /* calls waiting for a channel to be ready */ + call_data **waiting_children; + size_t waiting_child_count; + size_t waiting_child_capacity; + + /* transport setup for this channel */ + grpc_transport_setup *transport_setup; + int transport_setup_initiated; + + grpc_channel_args *args; + + /* metadata cache */ + grpc_mdelem *cancel_status; +} channel_data; + +typedef enum { + CALL_CREATED, + CALL_WAITING, + CALL_ACTIVE, + CALL_CANCELLED +} call_state; + +struct call_data { + /* owning element */ + grpc_call_element *elem; + + call_state state; + grpc_metadata_buffer pending_metadata; + gpr_timespec deadline; + union { + struct { + /* our child call stack */ + grpc_call_stack *child_stack; + /* ... and the channel stack associated with it */ + grpc_channel_stack *using_stack; + } active; + struct { + void (*on_complete)(void *user_data, grpc_op_error error); + void *on_complete_user_data; + gpr_uint32 start_flags; + } waiting; + } s; +}; + +static int prepare_activate(call_data *calld, child_entry *on_child) { + grpc_call_element *child_elem; + grpc_channel_stack *use_stack = on_child->channel_stack; + + if (calld->state == CALL_CANCELLED) return 0; + + on_child->inflight_requests++; + + /* no more access to calld->s.waiting allowed */ + GPR_ASSERT(calld->state == CALL_WAITING); + calld->state = CALL_ACTIVE; + + /* create a child stack, and record that we're using a particular channel + stack */ + calld->s.active.child_stack = gpr_malloc(use_stack->call_stack_size); + calld->s.active.using_stack = use_stack; + grpc_call_stack_init(use_stack, NULL, calld->s.active.child_stack); + /* initialize the top level link back element */ + child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0); + GPR_ASSERT(child_elem->filter == &link_back_filter); + ((lb_call_data *)child_elem->call_data)->back = calld->elem; + + return 1; +} + +static void do_nothing(void *ignored, grpc_op_error error) {} + +static void complete_activate(call_data *calld, child_entry *on_child, + grpc_call_op *op) { + grpc_call_element *child_elem = + grpc_call_stack_element(calld->s.active.child_stack, 0); + + GPR_ASSERT(calld->state == CALL_ACTIVE); + + /* sending buffered metadata down the stack before the start call */ + grpc_metadata_buffer_flush(&calld->pending_metadata, child_elem); + + if (gpr_time_cmp(calld->deadline, gpr_inf_future) != 0) { + grpc_call_op dop; + dop.type = GRPC_SEND_DEADLINE; + dop.dir = GRPC_CALL_DOWN; + dop.flags = 0; + dop.data.deadline = calld->deadline; + dop.done_cb = do_nothing; + dop.user_data = NULL; + child_elem->filter->call_op(child_elem, &dop); + } + + /* continue the start call down the stack, this nees to happen after metadata + are flushed*/ + child_elem->filter->call_op(child_elem, op); +} + +static void start_rpc(call_data *calld, channel_data *chand, grpc_call_op *op) { + gpr_mu_lock(&chand->mu); + if (calld->state == CALL_CANCELLED) { + gpr_mu_unlock(&chand->mu); + op->done_cb(op->user_data, GRPC_OP_ERROR); + return; + } + GPR_ASSERT(calld->state == CALL_CREATED); + calld->state = CALL_WAITING; + if (chand->active_child) { + /* channel is connected - use the connected stack */ + if (prepare_activate(calld, chand->active_child)) { + gpr_mu_unlock(&chand->mu); + /* activate the request (pass it down) outside the lock */ + complete_activate(calld, chand->active_child, op); + } else { + gpr_mu_unlock(&chand->mu); + } + } else { + /* check to see if we should initiate a connection (if we're not already), + but don't do so until outside the lock to avoid re-entrancy problems if + the callback is immediate */ + int initiate_transport_setup = 0; + if (!chand->transport_setup_initiated) { + chand->transport_setup_initiated = 1; + initiate_transport_setup = 1; + } + /* add this call to the waiting set to be resumed once we have a child + channel stack, growing the waiting set if needed */ + if (chand->waiting_child_count == chand->waiting_child_capacity) { + chand->waiting_child_capacity = + GPR_MAX(chand->waiting_child_capacity * 2, 8); + chand->waiting_children = + gpr_realloc(chand->waiting_children, + chand->waiting_child_capacity * sizeof(call_data *)); + } + calld->s.waiting.on_complete = op->done_cb; + calld->s.waiting.on_complete_user_data = op->user_data; + calld->s.waiting.start_flags = op->flags; + chand->waiting_children[chand->waiting_child_count++] = calld; + gpr_mu_unlock(&chand->mu); + + /* finally initiate transport setup if needed */ + if (initiate_transport_setup) { + grpc_transport_setup_initiate(chand->transport_setup); + } + } +} + +static void remove_waiting_child(channel_data *chand, call_data *calld) { + size_t new_count; + size_t i; + for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) { + if (chand->waiting_children[i] == calld) continue; + chand->waiting_children[new_count++] = chand->waiting_children[i]; + } + GPR_ASSERT(new_count == chand->waiting_child_count - 1 || + new_count == chand->waiting_child_count); + chand->waiting_child_count = new_count; +} + +static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_call_element *child_elem; + grpc_call_op finish_op; + + gpr_mu_lock(&chand->mu); + switch (calld->state) { + case CALL_ACTIVE: + child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0); + gpr_mu_unlock(&chand->mu); + child_elem->filter->call_op(child_elem, op); + return; /* early out */ + case CALL_WAITING: + remove_waiting_child(chand, calld); + calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data, + GRPC_OP_ERROR); + /* fallthrough intended */ + case CALL_CREATED: + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&chand->mu); + /* send up a synthesized status */ + finish_op.type = GRPC_RECV_METADATA; + finish_op.dir = GRPC_CALL_UP; + finish_op.flags = 0; + finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status); + finish_op.done_cb = do_nothing; + finish_op.user_data = NULL; + grpc_call_next_op(elem, &finish_op); + /* send up a finish */ + finish_op.type = GRPC_RECV_FINISH; + finish_op.dir = GRPC_CALL_UP; + finish_op.flags = 0; + finish_op.done_cb = do_nothing; + finish_op.user_data = NULL; + grpc_call_next_op(elem, &finish_op); + return; /* early out */ + case CALL_CANCELLED: + gpr_mu_unlock(&chand->mu); + return; /* early out */ + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); +} + +static void call_op(grpc_call_element *elem, grpc_call_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_call_element *child_elem; + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + + switch (op->type) { + case GRPC_SEND_METADATA: + grpc_metadata_buffer_queue(&calld->pending_metadata, op); + break; + case GRPC_SEND_DEADLINE: + calld->deadline = op->data.deadline; + op->done_cb(op->user_data, GRPC_OP_OK); + break; + case GRPC_SEND_START: + /* filter out the start event to find which child to send on */ + start_rpc(calld, chand, op); + break; + case GRPC_CANCEL_OP: + cancel_rpc(elem, op); + break; + default: + switch (op->dir) { + case GRPC_CALL_UP: + grpc_call_next_op(elem, op); + break; + case GRPC_CALL_DOWN: + child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0); + GPR_ASSERT(calld->state == CALL_ACTIVE); + child_elem->filter->call_op(child_elem, op); + break; + } + break; + } +} + +static void broadcast_channel_op_down(grpc_channel_element *elem, + grpc_channel_op *op) { + channel_data *chand = elem->channel_data; + grpc_channel_element *child_elem; + grpc_channel_stack **children; + size_t child_count; + size_t i; + + /* copy the current set of children, and mark them all as having an inflight + request */ + gpr_mu_lock(&chand->mu); + child_count = chand->child_count; + children = gpr_malloc(sizeof(grpc_channel_stack *) * child_count); + for (i = 0; i < child_count; i++) { + children[i] = chand->children[i].channel_stack; + chand->children[i].inflight_requests++; + } + gpr_mu_unlock(&chand->mu); + + /* send the message down */ + for (i = 0; i < child_count; i++) { + child_elem = grpc_channel_stack_element(children[i], 0); + child_elem->filter->channel_op(child_elem, op); + } + + /* unmark the inflight requests */ + gpr_mu_lock(&chand->mu); + for (i = 0; i < child_count; i++) { + chand->children[i].inflight_requests--; + } + gpr_mu_unlock(&chand->mu); + + gpr_free(children); +} + +static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) { + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + + switch (op->type) { + default: + switch (op->dir) { + case GRPC_CALL_UP: + grpc_channel_next_op(elem, op); + break; + case GRPC_CALL_DOWN: + broadcast_channel_op_down(elem, op); + break; + } + break; + } +} + +static void error_bad_on_complete(void *arg, grpc_op_error error) { + gpr_log(GPR_ERROR, + "Waiting finished but not started? Bad on_complete callback"); + abort(); +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_call_element *elem, + const void *server_transport_data) { + call_data *calld = elem->call_data; + + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + GPR_ASSERT(server_transport_data == NULL); + calld->elem = elem; + calld->state = CALL_CREATED; + calld->deadline = gpr_inf_future; + calld->s.waiting.on_complete = error_bad_on_complete; + calld->s.waiting.on_complete_user_data = NULL; + grpc_metadata_buffer_init(&calld->pending_metadata); +} + +/* Destructor for call_data */ +static void destroy_call_elem(grpc_call_element *elem) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + size_t i; + + /* if the metadata buffer is not flushed, destroy it here. */ + grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK); + /* if the call got activated, we need to destroy the child stack also, and + remove it from the in-flight requests tracked by the child_entry we + picked */ + if (calld->state == CALL_ACTIVE) { + grpc_call_stack_destroy(calld->s.active.child_stack); + gpr_free(calld->s.active.child_stack); + + gpr_mu_lock(&chand->mu); + for (i = 0; i < chand->child_count; i++) { + if (chand->children[i].channel_stack == calld->s.active.using_stack) { + chand->children[i].inflight_requests--; + /* TODO(ctiller): garbage collect channels that are not active + and have no inflight requests */ + } + } + gpr_mu_unlock(&chand->mu); + } +} + +/* Constructor for channel_data */ +static void init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, + grpc_mdctx *metadata_context, int is_first, + int is_last) { + channel_data *chand = elem->channel_data; + char temp[16]; + + GPR_ASSERT(!is_first); + GPR_ASSERT(is_last); + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + + gpr_mu_init(&chand->mu); + chand->active_child = NULL; + chand->children = NULL; + chand->child_count = 0; + chand->child_capacity = 0; + chand->waiting_children = NULL; + chand->waiting_child_count = 0; + chand->waiting_child_capacity = 0; + chand->transport_setup = NULL; + chand->transport_setup_initiated = 0; + chand->args = grpc_channel_args_copy(args); + + sprintf(temp, "%d", GRPC_STATUS_CANCELLED); + chand->cancel_status = + grpc_mdelem_from_strings(metadata_context, "grpc-status", temp); +} + +/* Destructor for channel_data */ +static void destroy_channel_elem(grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + size_t i; + + grpc_transport_setup_cancel(chand->transport_setup); + + for (i = 0; i < chand->child_count; i++) { + GPR_ASSERT(chand->children[i].inflight_requests == 0); + grpc_channel_stack_destroy(chand->children[i].channel_stack); + gpr_free(chand->children[i].channel_stack); + } + + grpc_channel_args_destroy(chand->args); + grpc_mdelem_unref(chand->cancel_status); + + gpr_mu_destroy(&chand->mu); + GPR_ASSERT(chand->waiting_child_count == 0); + gpr_free(chand->waiting_children); + gpr_free(chand->children); +} + +const grpc_channel_filter grpc_client_channel_filter = { + call_op, channel_op, + + sizeof(call_data), init_call_elem, destroy_call_elem, + + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + + "clientchannel", +}; + +grpc_transport_setup_result grpc_client_channel_transport_setup_complete( + grpc_channel_stack *channel_stack, grpc_transport *transport, + grpc_channel_filter const **channel_filters, size_t num_channel_filters, + grpc_mdctx *mdctx) { + /* we just got a new transport: lets create a child channel stack for it */ + grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); + channel_data *chand = elem->channel_data; + grpc_channel_element *lb_elem; + grpc_channel_stack *child_stack; + size_t num_child_filters = 2 + num_channel_filters; + grpc_channel_filter const **child_filters; + grpc_transport_setup_result result; + child_entry *child_ent; + call_data **waiting_children; + size_t waiting_child_count; + size_t i; + grpc_call_op *call_ops; + + /* build the child filter stack */ + child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters); + /* we always need a link back filter to get back to the connected channel */ + child_filters[0] = &link_back_filter; + for (i = 0; i < num_channel_filters; i++) { + child_filters[i + 1] = channel_filters[i]; + } + /* and we always need a connected channel to talk to the transport */ + child_filters[num_child_filters - 1] = &grpc_connected_channel_filter; + + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + + /* BEGIN LOCKING CHANNEL */ + gpr_mu_lock(&chand->mu); + chand->transport_setup_initiated = 0; + + if (chand->child_count == chand->child_capacity) { + /* realloc will invalidate chand->active_child, but it's reset in the next + stanza anyway */ + chand->child_capacity = + GPR_MAX(2 * chand->child_capacity, chand->child_capacity + 2); + chand->children = gpr_realloc(chand->children, + sizeof(child_entry) * chand->child_capacity); + } + + /* build up the child stack */ + child_stack = + gpr_malloc(grpc_channel_stack_size(child_filters, num_child_filters)); + grpc_channel_stack_init(child_filters, num_child_filters, chand->args, mdctx, + child_stack); + lb_elem = grpc_channel_stack_element(child_stack, 0); + GPR_ASSERT(lb_elem->filter == &link_back_filter); + ((lb_channel_data *)lb_elem->channel_data)->back = elem; + result = grpc_connected_channel_bind_transport(child_stack, transport); + child_ent = &chand->children[chand->child_count++]; + child_ent->channel_stack = child_stack; + child_ent->inflight_requests = 0; + chand->active_child = child_ent; + + /* capture the waiting children - we'll activate them outside the lock + to avoid re-entrancy problems */ + waiting_children = chand->waiting_children; + waiting_child_count = chand->waiting_child_count; + /* bumping up inflight_requests here avoids taking a lock per rpc below */ + + chand->waiting_children = NULL; + chand->waiting_child_count = 0; + chand->waiting_child_capacity = 0; + + call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count); + + for (i = 0; i < waiting_child_count; i++) { + call_ops[i].type = GRPC_SEND_START; + call_ops[i].dir = GRPC_CALL_DOWN; + call_ops[i].flags = waiting_children[i]->s.waiting.start_flags; + call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete; + call_ops[i].user_data = + waiting_children[i]->s.waiting.on_complete_user_data; + if (!prepare_activate(waiting_children[i], child_ent)) { + waiting_children[i] = NULL; + call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR); + } + } + + /* END LOCKING CHANNEL */ + gpr_mu_unlock(&chand->mu); + + /* activate any pending operations - this is safe to do as we guarantee one + and only one write operation per request at the surface api - if we lose + that guarantee we need to do some curly locking here */ + for (i = 0; i < waiting_child_count; i++) { + if (waiting_children[i]) { + complete_activate(waiting_children[i], child_ent, &call_ops[i]); + } + } + gpr_free(waiting_children); + gpr_free(call_ops); + gpr_free(child_filters); + + return result; +} + +void grpc_client_channel_set_transport_setup(grpc_channel_stack *channel_stack, + grpc_transport_setup *setup) { + /* post construction initialization: set the transport setup pointer */ + grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); + channel_data *chand = elem->channel_data; + GPR_ASSERT(!chand->transport_setup); + chand->transport_setup = setup; +} diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h new file mode 100644 index 0000000000..576af64ec7 --- /dev/null +++ b/src/core/channel/client_channel.h @@ -0,0 +1,62 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_CHANNEL_CLIENT_CHANNEL_H__ +#define __GRPC_INTERNAL_CHANNEL_CLIENT_CHANNEL_H__ + +#include "src/core/channel/channel_stack.h" + +/* A client channel is a channel that begins disconnected, and can connect + to some endpoint on demand. If that endpoint disconnects, it will be + connected to again later. + + Calls on a disconnected client channel are queued until a connection is + established. */ + +extern const grpc_channel_filter grpc_client_channel_filter; + +/* post-construction initializer to let the client channel know which + transport setup it should cancel upon destruction, or initiate when it needs + a connection */ +void grpc_client_channel_set_transport_setup(grpc_channel_stack *channel_stack, + grpc_transport_setup *setup); + +/* grpc_transport_setup_callback for binding new transports into a client + channel - user_data should be the channel stack containing the client + channel */ +grpc_transport_setup_result grpc_client_channel_transport_setup_complete( + grpc_channel_stack *channel_stack, grpc_transport *transport, + grpc_channel_filter const **channel_filters, size_t num_channel_filters, + grpc_mdctx *mdctx); + +#endif /* __GRPC_INTERNAL_CHANNEL_CLIENT_CHANNEL_H__ */ diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c new file mode 100644 index 0000000000..c667e39d9c --- /dev/null +++ b/src/core/channel/client_setup.c @@ -0,0 +1,239 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/client_setup.h" +#include "src/core/channel/channel_args.h" +#include "src/core/channel/channel_stack.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> + +struct grpc_client_setup { + grpc_transport_setup base; /* must be first */ + void (*initiate)(void *user_data, grpc_client_setup_request *request); + void (*done)(void *user_data); + void *user_data; + grpc_channel_args *args; + grpc_mdctx *mdctx; + grpc_em *em; + grpc_em_alarm backoff_alarm; + gpr_timespec current_backoff_interval; + int in_alarm; + + gpr_mu mu; + grpc_client_setup_request *active_request; + int refs; +}; + +struct grpc_client_setup_request { + /* pointer back to the setup object */ + grpc_client_setup *setup; + gpr_timespec deadline; +}; + +gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r) { + return r->deadline; +} + +static void destroy_setup(grpc_client_setup *s) { + gpr_mu_destroy(&s->mu); + s->done(s->user_data); + grpc_channel_args_destroy(s->args); + gpr_free(s); +} + +/* initiate handshaking */ +static void setup_initiate(grpc_transport_setup *sp) { + grpc_client_setup *s = (grpc_client_setup *)sp; + grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request)); + int in_alarm = 0; + + r->setup = s; + /* TODO(klempner): Actually set a deadline */ + r->deadline = gpr_inf_future; + + gpr_mu_lock(&s->mu); + GPR_ASSERT(s->refs > 0); + /* there might be more than one request outstanding if the caller calls + initiate in some kind of rapid-fire way: we try to connect each time, + and keep track of the latest request (which is the only one that gets + to finish) */ + if (!s->in_alarm) { + s->active_request = r; + s->refs++; + } else { + /* TODO(klempner): Maybe do something more clever here */ + in_alarm = 1; + } + gpr_mu_unlock(&s->mu); + + if (!in_alarm) { + s->initiate(s->user_data, r); + } else { + gpr_free(r); + } +} + +/* cancel handshaking: cancel all requests, and shutdown (the caller promises + not to initiate again) */ +static void setup_cancel(grpc_transport_setup *sp) { + grpc_client_setup *s = (grpc_client_setup *)sp; + void *ignored; + + gpr_mu_lock(&s->mu); + + GPR_ASSERT(s->refs > 0); + /* effectively cancels the current request (if any) */ + s->active_request = NULL; + if (s->in_alarm) { + grpc_em_alarm_cancel(&s->backoff_alarm, &ignored); + } + if (--s->refs == 0) { + gpr_mu_unlock(&s->mu); + destroy_setup(s); + } else { + gpr_mu_unlock(&s->mu); + } +} + +/* vtable for transport setup */ +static const grpc_transport_setup_vtable setup_vtable = {setup_initiate, + setup_cancel}; + +void grpc_client_setup_create_and_attach( + grpc_channel_stack *newly_minted_channel, const grpc_channel_args *args, + grpc_mdctx *mdctx, + void (*initiate)(void *user_data, grpc_client_setup_request *request), + void (*done)(void *user_data), void *user_data, grpc_em *em) { + grpc_client_setup *s = gpr_malloc(sizeof(grpc_client_setup)); + + s->base.vtable = &setup_vtable; + gpr_mu_init(&s->mu); + s->refs = 1; + s->mdctx = mdctx; + s->initiate = initiate; + s->done = done; + s->user_data = user_data; + s->em = em; + s->active_request = NULL; + s->args = grpc_channel_args_copy(args); + s->current_backoff_interval = gpr_time_from_micros(1000000); + s->in_alarm = 0; + + grpc_client_channel_set_transport_setup(newly_minted_channel, &s->base); +} + +int grpc_client_setup_request_should_continue(grpc_client_setup_request *r) { + int result; + if (gpr_time_cmp(gpr_now(), r->deadline) > 0) { + return 0; + } + gpr_mu_lock(&r->setup->mu); + result = r->setup->active_request == r; + gpr_mu_unlock(&r->setup->mu); + return result; +} + +static void backoff_alarm_done(void *arg /* grpc_client_setup */, + grpc_em_cb_status status) { + grpc_client_setup *s = arg; + grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request)); + r->setup = s; + /* TODO(klempner): Set this to something useful */ + r->deadline = gpr_inf_future; + /* Handle status cancelled? */ + gpr_mu_lock(&s->mu); + s->active_request = r; + s->in_alarm = 0; + if (status != GRPC_CALLBACK_SUCCESS) { + if (0 == --s->refs) { + gpr_mu_unlock(&s->mu); + destroy_setup(s); + gpr_free(r); + return; + } else { + gpr_mu_unlock(&s->mu); + return; + } + } + gpr_mu_unlock(&s->mu); + s->initiate(s->user_data, r); +} + +void grpc_client_setup_request_finish(grpc_client_setup_request *r, + int was_successful) { + int retry = !was_successful; + grpc_client_setup *s = r->setup; + + gpr_mu_lock(&s->mu); + if (s->active_request == r) { + s->active_request = NULL; + } else { + retry = 0; + } + if (!retry && 0 == --s->refs) { + gpr_mu_unlock(&s->mu); + destroy_setup(s); + gpr_free(r); + return; + } + + gpr_free(r); + + if (retry) { + /* TODO(klempner): Replace these values with further consideration. 2x is + probably too aggressive of a backoff. */ + gpr_timespec max_backoff = gpr_time_from_micros(120000000); + GPR_ASSERT(!s->in_alarm); + s->in_alarm = 1; + grpc_em_alarm_init(&s->backoff_alarm, s->em, backoff_alarm_done, s); + grpc_em_alarm_add(&s->backoff_alarm, + gpr_time_add(s->current_backoff_interval, gpr_now())); + s->current_backoff_interval = + gpr_time_add(s->current_backoff_interval, s->current_backoff_interval); + if (gpr_time_cmp(s->current_backoff_interval, max_backoff) > 0) { + s->current_backoff_interval = max_backoff; + } + } + + gpr_mu_unlock(&s->mu); +} + +const grpc_channel_args *grpc_client_setup_get_channel_args( + grpc_client_setup_request *r) { + return r->setup->args; +} + +grpc_mdctx *grpc_client_setup_get_mdctx(grpc_client_setup_request *r) { + return r->setup->mdctx; +} diff --git a/src/core/channel/client_setup.h b/src/core/channel/client_setup.h new file mode 100644 index 0000000000..862c1325a3 --- /dev/null +++ b/src/core/channel/client_setup.h @@ -0,0 +1,68 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_CHANNEL_CLIENT_SETUP_H__ +#define __GRPC_INTERNAL_CHANNEL_CLIENT_SETUP_H__ + +#include "src/core/channel/client_channel.h" +#include "src/core/eventmanager/em.h" +#include "src/core/transport/metadata.h" +#include <grpc/support/time.h> + +/* Convenience API's to simplify transport setup */ + +typedef struct grpc_client_setup grpc_client_setup; +typedef struct grpc_client_setup_request grpc_client_setup_request; + +void grpc_client_setup_create_and_attach( + grpc_channel_stack *newly_minted_channel, const grpc_channel_args *args, + grpc_mdctx *mdctx, + void (*initiate)(void *user_data, grpc_client_setup_request *request), + void (*done)(void *user_data), void *user_data, grpc_em *em); + +/* Check that r is the active request: needs to be performed at each callback. + If this races, we'll have two connection attempts running at once and the + old one will get cleaned up in due course, which is fine. */ +int grpc_client_setup_request_should_continue(grpc_client_setup_request *r); +void grpc_client_setup_request_finish(grpc_client_setup_request *r, + int was_successful); +const grpc_channel_args *grpc_client_setup_get_channel_args( + grpc_client_setup_request *r); + +/* Get the deadline for a request passed in to initiate. Implementations should + make a best effort to honor this deadline. */ +gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r); + +grpc_mdctx *grpc_client_setup_get_mdctx(grpc_client_setup_request *r); + +#endif /* __GRPC_INTERNAL_CHANNEL_CLIENT_SETUP_H__ */ diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c new file mode 100644 index 0000000000..336472e740 --- /dev/null +++ b/src/core/channel/connected_channel.c @@ -0,0 +1,501 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/connected_channel.h" + +#include <stdarg.h> +#include <stdio.h> +#include <string.h> + +#include "src/core/transport/transport.h" +#include <grpc/byte_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string.h> + +#define MAX_BUFFER_LENGTH 8192 +/* the protobuf library will (by default) start warning at 100megs */ +#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) + +typedef struct { + grpc_transport *transport; + gpr_uint32 max_message_length; +} channel_data; + +typedef struct { + grpc_call_element *elem; + grpc_stream_op_buffer outgoing_sopb; + + gpr_uint32 max_message_length; + gpr_uint32 incoming_message_length; + gpr_uint8 reading_message; + gpr_uint8 got_metadata_boundary; + gpr_uint8 got_read_close; + gpr_slice_buffer incoming_message; + gpr_uint32 outgoing_buffer_length_estimate; +} call_data; + +/* We perform a small hack to locate transport data alongside the connected + channel data in call allocations, to allow everything to be pulled in minimal + cache line requests */ +#define TRANSPORT_STREAM_FROM_CALL_DATA(calld) ((grpc_stream *)((calld)+1)) +#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \ + (((call_data *)(transport_stream)) - 1) + +/* Copy the contents of a byte buffer into stream ops */ +static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, + grpc_stream_op_buffer *sopb) { + size_t i; + + switch (byte_buffer->type) { + case GRPC_BB_SLICE_BUFFER: + for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) { + gpr_slice slice = byte_buffer->data.slice_buffer.slices[i]; + gpr_slice_ref(slice); + grpc_sopb_add_slice(sopb, slice); + } + break; + } +} + +/* Flush queued stream operations onto the transport */ +static void end_bufferable_op(grpc_call_op *op, channel_data *chand, + call_data *calld, int is_last) { + size_t nops; + + if (op->flags & GRPC_WRITE_BUFFER_HINT) { + if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) { + op->done_cb(op->user_data, GRPC_OP_OK); + return; + } + } + + calld->outgoing_buffer_length_estimate = 0; + grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data); + + nops = calld->outgoing_sopb.nops; + calld->outgoing_sopb.nops = 0; + grpc_transport_send_batch(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + calld->outgoing_sopb.ops, nops, is_last); +} + +/* Intercept a call operation and either push it directly up or translate it + into transport stream operations */ +static void call_op(grpc_call_element *elem, grpc_call_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + + switch (op->type) { + case GRPC_SEND_METADATA: + grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata); + grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, + op->user_data); + break; + case GRPC_SEND_DEADLINE: + grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.deadline); + grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, + op->user_data); + break; + case GRPC_SEND_START: + grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb); + end_bufferable_op(op, chand, calld, 0); + break; + case GRPC_SEND_MESSAGE: + grpc_sopb_add_begin_message(&calld->outgoing_sopb, + grpc_byte_buffer_length(op->data.message), + op->flags); + copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb); + calld->outgoing_buffer_length_estimate += + (5 + grpc_byte_buffer_length(op->data.message)); + end_bufferable_op(op, chand, calld, 0); + break; + case GRPC_SEND_FINISH: + end_bufferable_op(op, chand, calld, 1); + break; + case GRPC_REQUEST_DATA: + /* re-arm window updates if they were disarmed by finish_message */ + grpc_transport_set_allow_window_updates( + chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1); + break; + case GRPC_CANCEL_OP: + grpc_transport_abort_stream(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + GRPC_STATUS_CANCELLED); + break; + default: + GPR_ASSERT(op->dir == GRPC_CALL_UP); + grpc_call_next_op(elem, op); + break; + } +} + +/* Currently we assume all channel operations should just be pushed up. */ +static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) { + channel_data *chand = elem->channel_data; + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + + switch (op->type) { + case GRPC_CHANNEL_SHUTDOWN: + grpc_transport_close(chand->transport); + break; + default: + GPR_ASSERT(op->dir == GRPC_CALL_UP); + grpc_channel_next_op(elem, op); + break; + } +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_call_element *elem, + const void *server_transport_data) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + int r; + + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + calld->elem = elem; + grpc_sopb_init(&calld->outgoing_sopb); + + calld->reading_message = 0; + calld->got_metadata_boundary = 0; + calld->got_read_close = 0; + calld->outgoing_buffer_length_estimate = 0; + calld->max_message_length = chand->max_message_length; + gpr_slice_buffer_init(&calld->incoming_message); + r = grpc_transport_init_stream(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + server_transport_data); + GPR_ASSERT(r == 0); +} + +/* Destructor for call_data */ +static void destroy_call_elem(grpc_call_element *elem) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + grpc_sopb_destroy(&calld->outgoing_sopb); + gpr_slice_buffer_destroy(&calld->incoming_message); + grpc_transport_destroy_stream(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld)); +} + +/* Constructor for channel_data */ +static void init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, grpc_mdctx *mdctx, + int is_first, int is_last) { + channel_data *cd = (channel_data *)elem->channel_data; + size_t i; + GPR_ASSERT(!is_first); + GPR_ASSERT(is_last); + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + cd->transport = NULL; + + cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; + if (args) { + for (i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { + if (args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s ignored: it must be an integer", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else if (args->args[i].value.integer < 0) { + gpr_log(GPR_ERROR, "%s ignored: it must be >= 0", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else { + cd->max_message_length = args->args[i].value.integer; + } + } + } + } +} + +/* Destructor for channel_data */ +static void destroy_channel_elem(grpc_channel_element *elem) { + channel_data *cd = (channel_data *)elem->channel_data; + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + grpc_transport_destroy(cd->transport); +} + +const grpc_channel_filter grpc_connected_channel_filter = { + call_op, channel_op, + + sizeof(call_data), init_call_elem, destroy_call_elem, + + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + + "connected", +}; + +static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport, + grpc_stream *stream, size_t size_hint) { + return gpr_slice_malloc(size_hint); +} + +/* Transport callback to accept a new stream... calls up to handle it */ +static void accept_stream(void *user_data, grpc_transport *transport, + const void *transport_server_data) { + grpc_channel_element *elem = user_data; + channel_data *chand = elem->channel_data; + grpc_channel_op op; + + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + GPR_ASSERT(chand->transport == transport); + + op.type = GRPC_ACCEPT_CALL; + op.dir = GRPC_CALL_UP; + op.data.accept_call.transport = transport; + op.data.accept_call.transport_server_data = transport_server_data; + channel_op(elem, &op); +} + +static void recv_error(channel_data *chand, call_data *calld, int line, + const char *fmt, ...) { + char msg[512]; + va_list a; + + va_start(a, fmt); + vsprintf(msg, fmt, a); + va_end(a); + + gpr_log(__FILE__, line, GPR_LOG_SEVERITY_ERROR, "%s", msg); + + if (chand->transport) { + grpc_transport_abort_stream(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + GRPC_STATUS_INVALID_ARGUMENT); + } +} + +static void do_nothing(void *calldata, grpc_op_error error) {} + +static void done_message(void *user_data, grpc_op_error error) { + grpc_byte_buffer_destroy(user_data); +} + +static void finish_message(channel_data *chand, call_data *calld) { + grpc_call_element *elem = calld->elem; + grpc_call_op call_op; + call_op.dir = GRPC_CALL_UP; + call_op.flags = 0; + /* if we got all the bytes for this message, call up the stack */ + call_op.type = GRPC_RECV_MESSAGE; + call_op.done_cb = done_message; + /* TODO(ctiller): this could be a lot faster if coded directly */ + call_op.user_data = call_op.data.message = grpc_byte_buffer_create( + calld->incoming_message.slices, calld->incoming_message.count); + gpr_slice_buffer_reset_and_unref(&calld->incoming_message); + + /* disable window updates until we get a request more from above */ + grpc_transport_set_allow_window_updates( + chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0); + + GPR_ASSERT(calld->incoming_message.count == 0); + calld->reading_message = 0; + grpc_call_next_op(elem, &call_op); +} + +/* Handle incoming stream ops from the transport, translating them into + call_ops to pass up the call stack */ +static void recv_batch(void *user_data, grpc_transport *transport, + grpc_stream *stream, grpc_stream_op *ops, + size_t ops_count, grpc_stream_state final_state) { + call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream); + grpc_call_element *elem = calld->elem; + channel_data *chand = elem->channel_data; + grpc_stream_op *stream_op; + grpc_call_op call_op; + size_t i; + gpr_uint32 length; + + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + + for (i = 0; i < ops_count; i++) { + stream_op = ops + i; + switch (stream_op->type) { + case GRPC_OP_FLOW_CTL_CB: + gpr_log(GPR_ERROR, + "should not receive flow control ops from transport"); + abort(); + break; + case GRPC_NO_OP: + break; + case GRPC_OP_METADATA: + call_op.type = GRPC_RECV_METADATA; + call_op.dir = GRPC_CALL_UP; + call_op.flags = 0; + call_op.data.metadata = stream_op->data.metadata; + call_op.done_cb = do_nothing; + call_op.user_data = NULL; + grpc_call_next_op(elem, &call_op); + break; + case GRPC_OP_DEADLINE: + call_op.type = GRPC_RECV_DEADLINE; + call_op.dir = GRPC_CALL_UP; + call_op.flags = 0; + call_op.data.deadline = stream_op->data.deadline; + call_op.done_cb = do_nothing; + call_op.user_data = NULL; + grpc_call_next_op(elem, &call_op); + break; + case GRPC_OP_METADATA_BOUNDARY: + if (!calld->got_metadata_boundary) { + calld->got_metadata_boundary = 1; + call_op.type = GRPC_RECV_END_OF_INITIAL_METADATA; + call_op.dir = GRPC_CALL_UP; + call_op.flags = 0; + call_op.done_cb = do_nothing; + call_op.user_data = NULL; + grpc_call_next_op(elem, &call_op); + } + break; + case GRPC_OP_BEGIN_MESSAGE: + /* can't begin a message when we're still reading a message */ + if (calld->reading_message) { + recv_error(chand, calld, __LINE__, + "Message terminated early; read %d bytes, expected %d", + calld->incoming_message.length, + calld->incoming_message_length); + return; + } + /* stash away parameters, and prepare for incoming slices */ + length = stream_op->data.begin_message.length; + if (length > calld->max_message_length) { + recv_error( + chand, calld, __LINE__, + "Maximum message length of %d exceeded by a message of length %d", + calld->max_message_length, length); + } else if (length > 0) { + calld->reading_message = 1; + calld->incoming_message_length = length; + } else { + finish_message(chand, calld); + } + break; + case GRPC_OP_SLICE: + if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) { + gpr_slice_unref(stream_op->data.slice); + break; + } + /* we have to be reading a message to know what to do here */ + if (!calld->reading_message) { + recv_error(chand, calld, __LINE__, + "Received payload data while not reading a message"); + return; + } + /* append the slice to the incoming buffer */ + gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice); + if (calld->incoming_message.length > calld->incoming_message_length) { + /* if we got too many bytes, complain */ + recv_error(chand, calld, __LINE__, + "Receiving message overflow; read %d bytes, expected %d", + calld->incoming_message.length, + calld->incoming_message_length); + return; + } else if (calld->incoming_message.length == + calld->incoming_message_length) { + finish_message(chand, calld); + } + } + } + /* if the stream closed, then call up the stack to let it know */ + if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED || + final_state == GRPC_STREAM_CLOSED)) { + calld->got_read_close = 1; + if (calld->reading_message) { + recv_error(chand, calld, __LINE__, + "Last message truncated; read %d bytes, expected %d", + calld->incoming_message.length, + calld->incoming_message_length); + return; + } + call_op.type = GRPC_RECV_HALF_CLOSE; + call_op.dir = GRPC_CALL_UP; + call_op.flags = 0; + call_op.done_cb = do_nothing; + call_op.user_data = NULL; + grpc_call_next_op(elem, &call_op); + } + if (final_state == GRPC_STREAM_CLOSED) { + call_op.type = GRPC_RECV_FINISH; + call_op.dir = GRPC_CALL_UP; + call_op.flags = 0; + call_op.done_cb = do_nothing; + call_op.user_data = NULL; + grpc_call_next_op(elem, &call_op); + } +} + +static void transport_closed(void *user_data, grpc_transport *transport) { + /* transport was closed ==> call up and handle it */ + grpc_channel_element *elem = user_data; + channel_data *chand = elem->channel_data; + grpc_channel_op op; + + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + GPR_ASSERT(chand->transport == transport); + + op.type = GRPC_TRANSPORT_CLOSED; + op.dir = GRPC_CALL_UP; + channel_op(elem, &op); +} + +const grpc_transport_callbacks connected_channel_transport_callbacks = { + alloc_recv_buffer, accept_stream, recv_batch, transport_closed, +}; + +grpc_transport_setup_result grpc_connected_channel_bind_transport( + grpc_channel_stack *channel_stack, grpc_transport *transport) { + /* Assumes that the connected channel filter is always the last filter + in a channel stack */ + grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); + channel_data *cd = (channel_data *)elem->channel_data; + grpc_transport_setup_result ret; + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + GPR_ASSERT(cd->transport == NULL); + cd->transport = transport; + + /* HACK(ctiller): increase call stack size for the channel to make space + for channel data. We need a cleaner (but performant) way to do this, + and I'm not sure what that is yet. + This is only "safe" because call stacks place no additional data after + the last call element, and the last call element MUST be the connected + channel. */ + channel_stack->call_stack_size += grpc_transport_stream_size(transport); + + ret.user_data = elem; + ret.callbacks = &connected_channel_transport_callbacks; + return ret; +} diff --git a/src/core/channel/connected_channel.h b/src/core/channel/connected_channel.h new file mode 100644 index 0000000000..660ea7ad89 --- /dev/null +++ b/src/core/channel/connected_channel.h @@ -0,0 +1,49 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_CHANNEL_CONNECTED_CHANNEL_H__ +#define __GRPC_INTERNAL_CHANNEL_CONNECTED_CHANNEL_H__ + +#include "src/core/channel/channel_stack.h" + +/* A channel filter representing a channel that is on a connected transport. + This filter performs actual sending and receiving of messages. */ + +extern const grpc_channel_filter grpc_connected_channel_filter; + +/* Post construction fixup: set the transport in the connected channel. + Must be called before any call stack using this filter is used. */ +grpc_transport_setup_result grpc_connected_channel_bind_transport( + grpc_channel_stack *channel_stack, grpc_transport *transport); + +#endif /* __GRPC_INTERNAL_CHANNEL_CONNECTED_CHANNEL_H__ */ diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c new file mode 100644 index 0000000000..b82c7352d3 --- /dev/null +++ b/src/core/channel/http_client_filter.c @@ -0,0 +1,143 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/http_client_filter.h" +#include <grpc/support/log.h> + +typedef struct call_data { + int unused; /* C89 requires at least one struct element */ +} call_data; + +typedef struct channel_data { grpc_mdelem *te_trailers; } channel_data; + +/* used to silence 'variable not used' warnings */ +static void ignore_unused(void *ignored) {} + +/* Called either: + - in response to an API call (or similar) from above, to send something + - a network event (or similar) from below, to receive something + op contains type and call direction information, in addition to the data + that is being sent or received. */ +static void call_op(grpc_call_element *elem, grpc_call_op *op) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + + ignore_unused(calld); + + switch (op->type) { + case GRPC_SEND_START: + /* just prior to starting, add a te: trailers header */ + grpc_call_element_send_metadata(elem, channeld->te_trailers); + grpc_call_next_op(elem, op); + break; + default: + /* pass control up or down the stack depending on op->dir */ + grpc_call_next_op(elem, op); + break; + } +} + +/* Called on special channel events, such as disconnection or new incoming + calls on the server */ +static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) { + /* grab pointers to our data from the channel element */ + channel_data *channeld = elem->channel_data; + + ignore_unused(channeld); + + switch (op->type) { + default: + /* pass control up or down the stack depending on op->dir */ + grpc_channel_next_op(elem, op); + break; + } +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_call_element *elem, + const void *server_transport_data) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + ignore_unused(channeld); + + /* initialize members */ + calld->unused = 0; +} + +/* Destructor for call_data */ +static void destroy_call_elem(grpc_call_element *elem) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + ignore_unused(calld); + ignore_unused(channeld); +} + +/* Constructor for channel_data */ +static void init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, grpc_mdctx *mdctx, + int is_first, int is_last) { + /* grab pointers to our data from the channel element */ + channel_data *channeld = elem->channel_data; + + /* The first and the last filters tend to be implemented differently to + handle the case that there's no 'next' filter to call on the up or down + path */ + GPR_ASSERT(!is_first); + GPR_ASSERT(!is_last); + + /* initialize members */ + channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers"); +} + +/* Destructor for channel data */ +static void destroy_channel_elem(grpc_channel_element *elem) { + /* grab pointers to our data from the channel element */ + channel_data *channeld = elem->channel_data; + + grpc_mdelem_unref(channeld->te_trailers); +} + +const grpc_channel_filter grpc_http_client_filter = { + call_op, channel_op, + + sizeof(call_data), init_call_elem, destroy_call_elem, + + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + + "http-client"}; diff --git a/src/core/channel/http_client_filter.h b/src/core/channel/http_client_filter.h new file mode 100644 index 0000000000..f939cbd351 --- /dev/null +++ b/src/core/channel/http_client_filter.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_CHANNEL_HTTP_CLIENT_FILTER_H__ +#define __GRPC_INTERNAL_CHANNEL_HTTP_CLIENT_FILTER_H__ + +#include "src/core/channel/channel_stack.h" + +/* Processes metadata on the client side for HTTP2 transports */ +extern const grpc_channel_filter grpc_http_client_filter; + +#endif /* __GRPC_INTERNAL_CHANNEL_HTTP_CLIENT_FILTER_H__ */ diff --git a/src/core/channel/http_filter.c b/src/core/channel/http_filter.c new file mode 100644 index 0000000000..b5c154144e --- /dev/null +++ b/src/core/channel/http_filter.c @@ -0,0 +1,139 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/http_filter.h" +#include <grpc/support/log.h> + +typedef struct call_data { + int unused; /* C89 requires at least one struct element */ +} call_data; + +typedef struct channel_data { + int unused; /* C89 requires at least one struct element */ +} channel_data; + +/* used to silence 'variable not used' warnings */ +static void ignore_unused(void *ignored) {} + +/* Called either: + - in response to an API call (or similar) from above, to send something + - a network event (or similar) from below, to receive something + op contains type and call direction information, in addition to the data + that is being sent or received. */ +static void call_op(grpc_call_element *elem, grpc_call_op *op) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + + ignore_unused(calld); + ignore_unused(channeld); + + switch (op->type) { + default: + /* pass control up or down the stack depending on op->dir */ + grpc_call_next_op(elem, op); + break; + } +} + +/* Called on special channel events, such as disconnection or new incoming + calls on the server */ +static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) { + /* grab pointers to our data from the channel element */ + channel_data *channeld = elem->channel_data; + + ignore_unused(channeld); + + switch (op->type) { + default: + /* pass control up or down the stack depending on op->dir */ + grpc_channel_next_op(elem, op); + break; + } +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_call_element *elem, + const void *server_transport_data) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + /* initialize members */ + calld->unused = channeld->unused; +} + +/* Destructor for call_data */ +static void destroy_call_elem(grpc_call_element *elem) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + ignore_unused(calld); + ignore_unused(channeld); +} + +/* Constructor for channel_data */ +static void init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, grpc_mdctx *mdctx, + int is_first, int is_last) { + /* grab pointers to our data from the channel element */ + channel_data *channeld = elem->channel_data; + + /* The first and the last filters tend to be implemented differently to + handle the case that there's no 'next' filter to call on the up or down + path */ + GPR_ASSERT(!is_first); + GPR_ASSERT(!is_last); + + /* initialize members */ + channeld->unused = 0; +} + +/* Destructor for channel data */ +static void destroy_channel_elem(grpc_channel_element *elem) { + /* grab pointers to our data from the channel element */ + channel_data *channeld = elem->channel_data; + + ignore_unused(channeld); +} + +const grpc_channel_filter grpc_http_filter = { + call_op, channel_op, + + sizeof(call_data), init_call_elem, destroy_call_elem, + + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + + "http"}; diff --git a/src/core/channel/http_filter.h b/src/core/channel/http_filter.h new file mode 100644 index 0000000000..89ad482d35 --- /dev/null +++ b/src/core/channel/http_filter.h @@ -0,0 +1,43 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_CHANNEL_HTTP_FILTER_H__ +#define __GRPC_INTERNAL_CHANNEL_HTTP_FILTER_H__ + +#include "src/core/channel/channel_stack.h" + +/* Processes metadata that is common to both client and server for HTTP2 + transports. */ +extern const grpc_channel_filter grpc_http_filter; + +#endif /* __GRPC_INTERNAL_CHANNEL_HTTP_FILTER_H__ */ diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c new file mode 100644 index 0000000000..b176064813 --- /dev/null +++ b/src/core/channel/http_server_filter.c @@ -0,0 +1,150 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/http_server_filter.h" +#include <grpc/support/log.h> + +typedef struct call_data { + int unused; /* C89 requires at least one struct element */ +} call_data; + +typedef struct channel_data { grpc_mdelem *te_trailers; } channel_data; + +/* used to silence 'variable not used' warnings */ +static void ignore_unused(void *ignored) {} + +/* Called either: + - in response to an API call (or similar) from above, to send something + - a network event (or similar) from below, to receive something + op contains type and call direction information, in addition to the data + that is being sent or received. */ +static void call_op(grpc_call_element *elem, grpc_call_op *op) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + + ignore_unused(calld); + ignore_unused(channeld); + + switch (op->type) { + case GRPC_RECV_METADATA: + /* check if it's a te: trailers header */ + if (op->data.metadata == channeld->te_trailers) { + /* swallow it */ + grpc_mdelem_unref(op->data.metadata); + op->done_cb(op->user_data, GRPC_OP_OK); + } else { + /* pass the event up */ + grpc_call_next_op(elem, op); + } + break; + default: + /* pass control up or down the stack depending on op->dir */ + grpc_call_next_op(elem, op); + break; + } +} + +/* Called on special channel events, such as disconnection or new incoming + calls on the server */ +static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) { + /* grab pointers to our data from the channel element */ + channel_data *channeld = elem->channel_data; + + ignore_unused(channeld); + + switch (op->type) { + default: + /* pass control up or down the stack depending on op->dir */ + grpc_channel_next_op(elem, op); + break; + } +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_call_element *elem, + const void *server_transport_data) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + ignore_unused(channeld); + + /* initialize members */ + calld->unused = 0; +} + +/* Destructor for call_data */ +static void destroy_call_elem(grpc_call_element *elem) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + ignore_unused(calld); + ignore_unused(channeld); +} + +/* Constructor for channel_data */ +static void init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, grpc_mdctx *mdctx, + int is_first, int is_last) { + /* grab pointers to our data from the channel element */ + channel_data *channeld = elem->channel_data; + + /* The first and the last filters tend to be implemented differently to + handle the case that there's no 'next' filter to call on the up or down + path */ + GPR_ASSERT(!is_first); + GPR_ASSERT(!is_last); + + /* initialize members */ + channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers"); +} + +/* Destructor for channel data */ +static void destroy_channel_elem(grpc_channel_element *elem) { + /* grab pointers to our data from the channel element */ + channel_data *channeld = elem->channel_data; + + grpc_mdelem_unref(channeld->te_trailers); +} + +const grpc_channel_filter grpc_http_server_filter = { + call_op, channel_op, + + sizeof(call_data), init_call_elem, destroy_call_elem, + + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + + "http-server"}; diff --git a/src/core/channel/http_server_filter.h b/src/core/channel/http_server_filter.h new file mode 100644 index 0000000000..5b475432aa --- /dev/null +++ b/src/core/channel/http_server_filter.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_CHANNEL_HTTP_SERVER_FILTER_H__ +#define __GRPC_INTERNAL_CHANNEL_HTTP_SERVER_FILTER_H__ + +#include "src/core/channel/channel_stack.h" + +/* Processes metadata on the client side for HTTP2 transports */ +extern const grpc_channel_filter grpc_http_server_filter; + +#endif /* __GRPC_INTERNAL_CHANNEL_HTTP_SERVER_FILTER_H__ */ diff --git a/src/core/channel/metadata_buffer.c b/src/core/channel/metadata_buffer.c new file mode 100644 index 0000000000..75fd90b707 --- /dev/null +++ b/src/core/channel/metadata_buffer.c @@ -0,0 +1,198 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/metadata_buffer.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> + +#include <string.h> + +#define INITIAL_ELEM_CAP 8 + +/* One queued call; we track offsets to string data in a shared buffer to + reduce allocations. See grpc_metadata_buffer_impl for the memory use + strategy */ +typedef struct { + grpc_mdelem *md; + void (*cb)(void *user_data, grpc_op_error error); + void *user_data; + gpr_uint32 flags; +} qelem; + +/* Memory layout: + + grpc_metadata_buffer_impl + followed by an array of qelem */ +struct grpc_metadata_buffer_impl { + /* number of elements in q */ + size_t elems; + /* capacity of q */ + size_t elem_cap; +}; + +#define ELEMS(buffer) ((qelem *)((buffer)+1)) + +void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer) { + /* start buffer as NULL, indicating no elements */ + *buffer = NULL; +} + +void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer, + grpc_op_error error) { + size_t i; + qelem *qe; + if (*buffer) { + for (i = 0; i < (*buffer)->elems; i++) { + qe = &ELEMS(*buffer)[i]; + grpc_mdelem_unref(qe->md); + qe->cb(qe->user_data, error); + } + gpr_free(*buffer); + } +} + +void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer, + grpc_call_op *op) { + grpc_metadata_buffer_impl *impl = *buffer; + qelem *qe; + size_t bytes; + + GPR_ASSERT(op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA); + + if (!impl) { + /* this is the first element: allocate enough space to hold the + header object and the initial element capacity of qelems */ + bytes = + sizeof(grpc_metadata_buffer_impl) + INITIAL_ELEM_CAP * sizeof(qelem); + impl = gpr_malloc(bytes); + /* initialize the header object */ + impl->elems = 0; + impl->elem_cap = INITIAL_ELEM_CAP; + } else if (impl->elems == impl->elem_cap) { + /* more qelems than what we can deal with: grow by doubling size */ + impl->elem_cap *= 2; + bytes = sizeof(grpc_metadata_buffer_impl) + impl->elem_cap * sizeof(qelem); + impl = gpr_realloc(impl, bytes); + } + + /* append an element to the queue */ + qe = &ELEMS(impl)[impl->elems]; + impl->elems++; + + qe->md = op->data.metadata; + qe->cb = op->done_cb; + qe->user_data = op->user_data; + qe->flags = op->flags; + + /* header object may have changed location: store it back */ + *buffer = impl; +} + +void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer, + grpc_call_element *elem) { + grpc_metadata_buffer_impl *impl = *buffer; + grpc_call_op op; + qelem *qe; + size_t i; + + if (!impl) { + /* nothing to send */ + return; + } + + /* construct call_op's, and push them down the stack */ + op.type = GRPC_SEND_METADATA; + op.dir = GRPC_CALL_DOWN; + for (i = 0; i < impl->elems; i++) { + qe = &ELEMS(impl)[i]; + op.done_cb = qe->cb; + op.user_data = qe->user_data; + op.flags = qe->flags; + op.data.metadata = qe->md; + grpc_call_next_op(elem, &op); + } + + /* free data structures and reset to NULL: we can only flush once */ + gpr_free(impl); + *buffer = NULL; +} + +size_t grpc_metadata_buffer_count(const grpc_metadata_buffer *buffer) { + return *buffer ? (*buffer)->elems : 0; +} + +typedef struct { grpc_metadata_buffer_impl *impl; } elems_hdr; + +grpc_metadata *grpc_metadata_buffer_extract_elements( + grpc_metadata_buffer *buffer) { + grpc_metadata_buffer_impl *impl; + elems_hdr *hdr; + qelem *src; + grpc_metadata *out; + size_t i; + + impl = *buffer; + + if (!impl) { + return NULL; + } + + hdr = gpr_malloc(sizeof(elems_hdr) + impl->elems * sizeof(grpc_metadata)); + src = ELEMS(impl); + out = (grpc_metadata *)(hdr + 1); + + hdr->impl = impl; + for (i = 0; i < impl->elems; i++) { + out[i].key = (char *)grpc_mdstr_as_c_string(src[i].md->key); + out[i].value = (char *)grpc_mdstr_as_c_string(src[i].md->value); + out[i].value_length = GPR_SLICE_LENGTH(src[i].md->value->slice); + } + + /* clear out buffer (it's not possible to extract elements twice */ + *buffer = NULL; + + return out; +} + +void grpc_metadata_buffer_cleanup_elements(void *elements, + grpc_op_error error) { + elems_hdr *hdr = ((elems_hdr *)elements) - 1; + + if (!elements) { + return; + } + + grpc_metadata_buffer_destroy(&hdr->impl, error); + gpr_free(hdr); +} diff --git a/src/core/channel/metadata_buffer.h b/src/core/channel/metadata_buffer.h new file mode 100644 index 0000000000..818b290ce2 --- /dev/null +++ b/src/core/channel/metadata_buffer.h @@ -0,0 +1,70 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_CHANNEL_METADATA_BUFFER_H__ +#define __GRPC_INTERNAL_CHANNEL_METADATA_BUFFER_H__ + +#include "src/core/channel/channel_stack.h" + +/* Utility code to buffer GRPC_SEND_METADATA calls and pass them down the stack + all at once at some otherwise-determined time. Useful for implementing + filters that want to queue metadata until a START event chooses some + underlying filter stack to send an rpc on. */ + +/* Clients should declare a member of grpc_metadata_buffer. This may at some + point become a typedef for a struct, but for now a pointer suffices */ +typedef struct grpc_metadata_buffer_impl grpc_metadata_buffer_impl; +typedef grpc_metadata_buffer_impl *grpc_metadata_buffer; + +/* Initializes the metadata buffer. Allocates no memory. */ +void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer); +/* Destroy the metadata buffer. */ +void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer, + grpc_op_error error); +/* Append a call to the end of a metadata buffer: may allocate memory */ +void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer, grpc_call_op *op); +/* Flush all queued operations from the metadata buffer to the element below + self */ +void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer, + grpc_call_element *self); +/* Count the number of queued elements in the buffer. */ +size_t grpc_metadata_buffer_count(const grpc_metadata_buffer *buffer); +/* Extract elements as a grpc_metadata*, for presentation to applications. + The returned buffer must be freed with + grpc_metadata_buffer_cleanup_elements. + Clears the metadata buffer (this is a one-shot operation) */ +grpc_metadata *grpc_metadata_buffer_extract_elements( + grpc_metadata_buffer *buffer); +void grpc_metadata_buffer_cleanup_elements(void *elements, grpc_op_error error); + +#endif /* __GRPC_INTERNAL_CHANNEL_METADATA_BUFFER_H__ */ diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c new file mode 100644 index 0000000000..705df4a707 --- /dev/null +++ b/src/core/channel/noop_filter.c @@ -0,0 +1,138 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/noop_filter.h" +#include <grpc/support/log.h> + +typedef struct call_data { + int unused; /* C89 requires at least one struct element */ +} call_data; + +typedef struct channel_data { + int unused; /* C89 requires at least one struct element */ +} channel_data; + +/* used to silence 'variable not used' warnings */ +static void ignore_unused(void *ignored) {} + +/* Called either: + - in response to an API call (or similar) from above, to send something + - a network event (or similar) from below, to receive something + op contains type and call direction information, in addition to the data + that is being sent or received. */ +static void call_op(grpc_call_element *elem, grpc_call_op *op) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + ignore_unused(calld); + ignore_unused(channeld); + + switch (op->type) { + default: + /* pass control up or down the stack depending on op->dir */ + grpc_call_next_op(elem, op); + break; + } +} + +/* Called on special channel events, such as disconnection or new incoming + calls on the server */ +static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) { + /* grab pointers to our data from the channel element */ + channel_data *channeld = elem->channel_data; + + ignore_unused(channeld); + + switch (op->type) { + default: + /* pass control up or down the stack depending on op->dir */ + grpc_channel_next_op(elem, op); + break; + } +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_call_element *elem, + const void *server_transport_data) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + /* initialize members */ + calld->unused = channeld->unused; +} + +/* Destructor for call_data */ +static void destroy_call_elem(grpc_call_element *elem) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + ignore_unused(calld); + ignore_unused(channeld); +} + +/* Constructor for channel_data */ +static void init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, grpc_mdctx *mdctx, + int is_first, int is_last) { + /* grab pointers to our data from the channel element */ + channel_data *channeld = elem->channel_data; + + /* The first and the last filters tend to be implemented differently to + handle the case that there's no 'next' filter to call on the up or down + path */ + GPR_ASSERT(!is_first); + GPR_ASSERT(!is_last); + + /* initialize members */ + channeld->unused = 0; +} + +/* Destructor for channel data */ +static void destroy_channel_elem(grpc_channel_element *elem) { + /* grab pointers to our data from the channel element */ + channel_data *channeld = elem->channel_data; + + ignore_unused(channeld); +} + +const grpc_channel_filter grpc_no_op_filter = { + call_op, channel_op, + + sizeof(call_data), init_call_elem, destroy_call_elem, + + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + + "no-op"}; diff --git a/src/core/channel/noop_filter.h b/src/core/channel/noop_filter.h new file mode 100644 index 0000000000..4057ff7ac9 --- /dev/null +++ b/src/core/channel/noop_filter.h @@ -0,0 +1,44 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_CHANNEL_NOOP_FILTER_H__ +#define __GRPC_INTERNAL_CHANNEL_NOOP_FILTER_H__ + +#include "src/core/channel/channel_stack.h" + +/* No-op filter: simply takes everything it's given, and passes it on to the + next filter. Exists simply as a starting point that others can take and + customize for their own filters */ +extern const grpc_channel_filter grpc_no_op_filter; + +#endif /* __GRPC_INTERNAL_CHANNEL_NOOP_FILTER_H__ */ |