diff options
Diffstat (limited to 'src/core/channel')
-rw-r--r-- | src/core/channel/channel_args.c | 251 | ||||
-rw-r--r-- | src/core/channel/channel_args.h | 25 | ||||
-rw-r--r-- | src/core/channel/channel_stack.c | 187 | ||||
-rw-r--r-- | src/core/channel/channel_stack.h | 88 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 1031 | ||||
-rw-r--r-- | src/core/channel/client_channel.h | 22 | ||||
-rw-r--r-- | src/core/channel/compress_filter.c | 451 | ||||
-rw-r--r-- | src/core/channel/connected_channel.c | 118 | ||||
-rw-r--r-- | src/core/channel/connected_channel.h | 3 | ||||
-rw-r--r-- | src/core/channel/context.h | 8 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 313 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 382 | ||||
-rw-r--r-- | src/core/channel/noop_filter.c | 88 |
13 files changed, 1408 insertions, 1559 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c index 4207a97114..487db1119a 100644 --- a/src/core/channel/channel_args.c +++ b/src/core/channel/channel_args.c @@ -41,206 +41,169 @@ #include <string.h> -static grpc_arg -copy_arg (const grpc_arg * src) -{ +static grpc_arg copy_arg(const grpc_arg *src) { grpc_arg dst; dst.type = src->type; - dst.key = gpr_strdup (src->key); - switch (dst.type) - { + dst.key = gpr_strdup(src->key); + switch (dst.type) { case GRPC_ARG_STRING: - dst.value.string = gpr_strdup (src->value.string); + dst.value.string = gpr_strdup(src->value.string); break; case GRPC_ARG_INTEGER: dst.value.integer = src->value.integer; break; case GRPC_ARG_POINTER: dst.value.pointer = src->value.pointer; - dst.value.pointer.p = src->value.pointer.copy ? src->value.pointer.copy (src->value.pointer.p) : src->value.pointer.p; + dst.value.pointer.p = src->value.pointer.copy + ? src->value.pointer.copy(src->value.pointer.p) + : src->value.pointer.p; break; - } + } return dst; } -grpc_channel_args * -grpc_channel_args_copy_and_add (const grpc_channel_args * src, const grpc_arg * to_add, size_t num_to_add) -{ - grpc_channel_args *dst = gpr_malloc (sizeof (grpc_channel_args)); +grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, + const grpc_arg *to_add, + size_t num_to_add) { + grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args)); size_t i; size_t src_num_args = (src == NULL) ? 0 : src->num_args; - if (!src && !to_add) - { - dst->num_args = 0; - dst->args = NULL; - return dst; - } + if (!src && !to_add) { + dst->num_args = 0; + dst->args = NULL; + return dst; + } dst->num_args = src_num_args + num_to_add; - dst->args = gpr_malloc (sizeof (grpc_arg) * dst->num_args); - for (i = 0; i < src_num_args; i++) - { - dst->args[i] = copy_arg (&src->args[i]); - } - for (i = 0; i < num_to_add; i++) - { - dst->args[i + src_num_args] = copy_arg (&to_add[i]); - } + dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args); + for (i = 0; i < src_num_args; i++) { + dst->args[i] = copy_arg(&src->args[i]); + } + for (i = 0; i < num_to_add; i++) { + dst->args[i + src_num_args] = copy_arg(&to_add[i]); + } return dst; } -grpc_channel_args * -grpc_channel_args_copy (const grpc_channel_args * src) -{ - return grpc_channel_args_copy_and_add (src, NULL, 0); +grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src) { + return grpc_channel_args_copy_and_add(src, NULL, 0); } -grpc_channel_args * -grpc_channel_args_merge (const grpc_channel_args * a, const grpc_channel_args * b) -{ - return grpc_channel_args_copy_and_add (a, b->args, b->num_args); +grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, + const grpc_channel_args *b) { + return grpc_channel_args_copy_and_add(a, b->args, b->num_args); } -void -grpc_channel_args_destroy (grpc_channel_args * a) -{ +void grpc_channel_args_destroy(grpc_channel_args *a) { size_t i; - for (i = 0; i < a->num_args; i++) - { - switch (a->args[i].type) - { - case GRPC_ARG_STRING: - gpr_free (a->args[i].value.string); - break; - case GRPC_ARG_INTEGER: - break; - case GRPC_ARG_POINTER: - if (a->args[i].value.pointer.destroy) - { - a->args[i].value.pointer.destroy (a->args[i].value.pointer.p); - } - break; - } - gpr_free (a->args[i].key); + for (i = 0; i < a->num_args; i++) { + switch (a->args[i].type) { + case GRPC_ARG_STRING: + gpr_free(a->args[i].value.string); + break; + case GRPC_ARG_INTEGER: + break; + case GRPC_ARG_POINTER: + if (a->args[i].value.pointer.destroy) { + a->args[i].value.pointer.destroy(a->args[i].value.pointer.p); + } + break; } - gpr_free (a->args); - gpr_free (a); + gpr_free(a->args[i].key); + } + gpr_free(a->args); + gpr_free(a); } -int -grpc_channel_args_is_census_enabled (const grpc_channel_args * a) -{ +int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) { size_t i; - if (a == NULL) - return 0; - for (i = 0; i < a->num_args; i++) - { - if (0 == strcmp (a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) - { - return a->args[i].value.integer != 0; - } + if (a == NULL) return 0; + for (i = 0; i < a->num_args; i++) { + if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) { + return a->args[i].value.integer != 0; } + } return 0; } -grpc_compression_algorithm -grpc_channel_args_get_compression_algorithm (const grpc_channel_args * a) -{ +grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( + const grpc_channel_args *a) { size_t i; - if (a == NULL) - return 0; - for (i = 0; i < a->num_args; ++i) - { - if (a->args[i].type == GRPC_ARG_INTEGER && !strcmp (GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) - { - return (grpc_compression_algorithm) a->args[i].value.integer; - break; - } + if (a == NULL) return 0; + for (i = 0; i < a->num_args; ++i) { + if (a->args[i].type == GRPC_ARG_INTEGER && + !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) { + return (grpc_compression_algorithm)a->args[i].value.integer; + break; } + } return GRPC_COMPRESS_NONE; } -grpc_channel_args * -grpc_channel_args_set_compression_algorithm (grpc_channel_args * a, grpc_compression_algorithm algorithm) -{ +grpc_channel_args *grpc_channel_args_set_compression_algorithm( + grpc_channel_args *a, grpc_compression_algorithm algorithm) { grpc_arg tmp; tmp.type = GRPC_ARG_INTEGER; tmp.key = GRPC_COMPRESSION_ALGORITHM_ARG; tmp.value.integer = algorithm; - return grpc_channel_args_copy_and_add (a, &tmp, 1); + return grpc_channel_args_copy_and_add(a, &tmp, 1); } /** Returns 1 if the argument for compression algorithm's enabled states bitset * was found in \a a, returning the arg's value in \a states. Otherwise, returns * 0. */ -static int -find_compression_algorithm_states_bitset (const grpc_channel_args * a, int **states_arg) -{ - if (a != NULL) - { - size_t i; - for (i = 0; i < a->num_args; ++i) - { - if (a->args[i].type == GRPC_ARG_INTEGER && !strcmp (GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) - { - *states_arg = &a->args[i].value.integer; - return 1; /* GPR_TRUE */ - } - } +static int find_compression_algorithm_states_bitset(const grpc_channel_args *a, + int **states_arg) { + if (a != NULL) { + size_t i; + for (i = 0; i < a->num_args; ++i) { + if (a->args[i].type == GRPC_ARG_INTEGER && + !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) { + *states_arg = &a->args[i].value.integer; + return 1; /* GPR_TRUE */ + } } - return 0; /* GPR_FALSE */ + } + return 0; /* GPR_FALSE */ } -grpc_channel_args * -grpc_channel_args_compression_algorithm_set_state (grpc_channel_args ** a, grpc_compression_algorithm algorithm, int state) -{ +grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( + grpc_channel_args **a, grpc_compression_algorithm algorithm, int state) { int *states_arg; grpc_channel_args *result = *a; - const int states_arg_found = find_compression_algorithm_states_bitset (*a, &states_arg); + const int states_arg_found = + find_compression_algorithm_states_bitset(*a, &states_arg); - if (states_arg_found) - { - if (state != 0) - { - GPR_BITSET ((unsigned *) states_arg, algorithm); - } - else - { - GPR_BITCLEAR ((unsigned *) states_arg, algorithm); - } + if (states_arg_found) { + if (state != 0) { + GPR_BITSET((unsigned *)states_arg, algorithm); + } else { + GPR_BITCLEAR((unsigned *)states_arg, algorithm); } - else - { - /* create a new arg */ - grpc_arg tmp; - tmp.type = GRPC_ARG_INTEGER; - tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG; - /* all enabled by default */ - tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; - if (state != 0) - { - GPR_BITSET ((unsigned *) &tmp.value.integer, algorithm); - } - else - { - GPR_BITCLEAR ((unsigned *) &tmp.value.integer, algorithm); - } - result = grpc_channel_args_copy_and_add (*a, &tmp, 1); - grpc_channel_args_destroy (*a); - *a = result; + } else { + /* create a new arg */ + grpc_arg tmp; + tmp.type = GRPC_ARG_INTEGER; + tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG; + /* all enabled by default */ + tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; + if (state != 0) { + GPR_BITSET((unsigned *)&tmp.value.integer, algorithm); + } else { + GPR_BITCLEAR((unsigned *)&tmp.value.integer, algorithm); } + result = grpc_channel_args_copy_and_add(*a, &tmp, 1); + grpc_channel_args_destroy(*a); + *a = result; + } return result; } -int -grpc_channel_args_compression_algorithm_get_states (const grpc_channel_args * a) -{ +int grpc_channel_args_compression_algorithm_get_states( + const grpc_channel_args *a) { int *states_arg; - if (find_compression_algorithm_states_bitset (a, &states_arg)) - { - return *states_arg; - } - else - { - return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */ - } + if (find_compression_algorithm_states_bitset(a, &states_arg)) { + return *states_arg; + } else { + return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */ + } } diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h index bcfd8c46a0..f9e7b05860 100644 --- a/src/core/channel/channel_args.h +++ b/src/core/channel/channel_args.h @@ -38,29 +38,34 @@ #include <grpc/grpc.h> /* Copy some arguments */ -grpc_channel_args *grpc_channel_args_copy (const grpc_channel_args * src); +grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src); /** Copy some arguments and add the to_add parameter in the end. If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */ -grpc_channel_args *grpc_channel_args_copy_and_add (const grpc_channel_args * src, const grpc_arg * to_add, size_t num_to_add); +grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, + const grpc_arg *to_add, + size_t num_to_add); /** Copy args from a then args from b into a new channel args */ -grpc_channel_args *grpc_channel_args_merge (const grpc_channel_args * a, const grpc_channel_args * b); +grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, + const grpc_channel_args *b); /** Destroy arguments created by grpc_channel_args_copy */ -void grpc_channel_args_destroy (grpc_channel_args * a); +void grpc_channel_args_destroy(grpc_channel_args *a); /** Reads census_enabled settings from channel args. Returns 1 if census_enabled * is specified in channel args, otherwise returns 0. */ -int grpc_channel_args_is_census_enabled (const grpc_channel_args * a); +int grpc_channel_args_is_census_enabled(const grpc_channel_args *a); /** Returns the compression algorithm set in \a a. */ -grpc_compression_algorithm grpc_channel_args_get_compression_algorithm (const grpc_channel_args * a); +grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( + const grpc_channel_args *a); /** Returns a channel arg instance with compression enabled. If \a a is * non-NULL, its args are copied. N.B. GRPC_COMPRESS_NONE disables compression * for the channel. */ -grpc_channel_args *grpc_channel_args_set_compression_algorithm (grpc_channel_args * a, grpc_compression_algorithm algorithm); +grpc_channel_args *grpc_channel_args_set_compression_algorithm( + grpc_channel_args *a, grpc_compression_algorithm algorithm); /** Sets the support for the given compression algorithm. By default, all * compression algorithms are enabled. It's an error to disable an algorithm set @@ -69,13 +74,15 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm (grpc_channel_arg * Returns an instance will the updated algorithm states. The \a a pointer is * modified to point to the returned instance (which may be different from the * input value of \a a). */ -grpc_channel_args *grpc_channel_args_compression_algorithm_set_state (grpc_channel_args ** a, grpc_compression_algorithm algorithm, int enabled); +grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( + grpc_channel_args **a, grpc_compression_algorithm algorithm, int enabled); /** Returns the bitset representing the support state (true for enabled, false * for disabled) for compression algorithms. * * The i-th bit of the returned bitset corresponds to the i-th entry in the * grpc_compression_algorithm enum. */ -int grpc_channel_args_compression_algorithm_get_states (const grpc_channel_args * a); +int grpc_channel_args_compression_algorithm_get_states( + const grpc_channel_args *a); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */ diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 67ab2c535f..abd7f719e7 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -59,20 +59,21 @@ int grpc_trace_channel = 0; #define ROUND_UP_TO_ALIGNMENT_SIZE(x) \ (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u)) -size_t -grpc_channel_stack_size (const grpc_channel_filter ** filters, size_t filter_count) -{ +size_t grpc_channel_stack_size(const grpc_channel_filter **filters, + size_t filter_count) { /* always need the header, and size for the channel elements */ - size_t size = ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_channel_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE (filter_count * sizeof (grpc_channel_element)); + size_t size = + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack)) + + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element)); size_t i; - GPR_ASSERT ((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0 && "GPR_MAX_ALIGNMENT must be a power of two"); + GPR_ASSERT((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0 && + "GPR_MAX_ALIGNMENT must be a power of two"); /* add the size for each filter */ - for (i = 0; i < filter_count; i++) - { - size += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_channel_data); - } + for (i = 0; i < filter_count; i++) { + size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data); + } return size; } @@ -85,142 +86,142 @@ grpc_channel_stack_size (const grpc_channel_filter ** filters, size_t filter_cou ((grpc_call_element *)((char *)(stk) + \ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)))) -grpc_channel_element * -grpc_channel_stack_element (grpc_channel_stack * channel_stack, size_t index) -{ - return CHANNEL_ELEMS_FROM_STACK (channel_stack) + index; +grpc_channel_element *grpc_channel_stack_element( + grpc_channel_stack *channel_stack, size_t index) { + return CHANNEL_ELEMS_FROM_STACK(channel_stack) + index; } -grpc_channel_element * -grpc_channel_stack_last_element (grpc_channel_stack * channel_stack) -{ - return grpc_channel_stack_element (channel_stack, channel_stack->count - 1); +grpc_channel_element *grpc_channel_stack_last_element( + grpc_channel_stack *channel_stack) { + return grpc_channel_stack_element(channel_stack, channel_stack->count - 1); } -grpc_call_element * -grpc_call_stack_element (grpc_call_stack * call_stack, size_t index) -{ - return CALL_ELEMS_FROM_STACK (call_stack) + index; +grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack, + size_t index) { + return CALL_ELEMS_FROM_STACK(call_stack) + index; } -void -grpc_channel_stack_init (grpc_exec_ctx * exec_ctx, const grpc_channel_filter ** filters, size_t filter_count, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, grpc_channel_stack * stack) -{ - size_t call_size = ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_call_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE (filter_count * sizeof (grpc_call_element)); +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, + const grpc_channel_filter **filters, + size_t filter_count, grpc_channel *master, + const grpc_channel_args *args, + grpc_mdctx *metadata_context, + grpc_channel_stack *stack) { + size_t call_size = + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element)); grpc_channel_element *elems; char *user_data; size_t i; stack->count = filter_count; - elems = CHANNEL_ELEMS_FROM_STACK (stack); - user_data = ((char *) elems) + ROUND_UP_TO_ALIGNMENT_SIZE (filter_count * sizeof (grpc_channel_element)); + elems = CHANNEL_ELEMS_FROM_STACK(stack); + user_data = + ((char *)elems) + + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element)); /* init per-filter data */ - for (i = 0; i < filter_count; i++) - { - elems[i].filter = filters[i]; - elems[i].channel_data = user_data; - elems[i].filter->init_channel_elem (exec_ctx, &elems[i], master, args, metadata_context, i == 0, i == (filter_count - 1)); - user_data += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_channel_data); - call_size += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_call_data); - } - - GPR_ASSERT (user_data > (char *) stack); - GPR_ASSERT ((gpr_uintptr) (user_data - (char *) stack) == grpc_channel_stack_size (filters, filter_count)); + for (i = 0; i < filter_count; i++) { + elems[i].filter = filters[i]; + elems[i].channel_data = user_data; + elems[i].filter->init_channel_elem(exec_ctx, &elems[i], master, args, + metadata_context, i == 0, + i == (filter_count - 1)); + user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data); + call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data); + } + + GPR_ASSERT(user_data > (char *)stack); + GPR_ASSERT((gpr_uintptr)(user_data - (char *)stack) == + grpc_channel_stack_size(filters, filter_count)); stack->call_stack_size = call_size; } -void -grpc_channel_stack_destroy (grpc_exec_ctx * exec_ctx, grpc_channel_stack * stack) -{ - grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK (stack); +void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *stack) { + grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(stack); size_t count = stack->count; size_t i; /* destroy per-filter data */ - for (i = 0; i < count; i++) - { - channel_elems[i].filter->destroy_channel_elem (exec_ctx, &channel_elems[i]); - } + for (i = 0; i < count; i++) { + channel_elems[i].filter->destroy_channel_elem(exec_ctx, &channel_elems[i]); + } } -void -grpc_call_stack_init (grpc_exec_ctx * exec_ctx, grpc_channel_stack * channel_stack, const void *transport_server_data, grpc_transport_stream_op * initial_op, grpc_call_stack * call_stack) -{ - grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK (channel_stack); +void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, + const void *transport_server_data, + grpc_transport_stream_op *initial_op, + grpc_call_stack *call_stack) { + grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); size_t count = channel_stack->count; grpc_call_element *call_elems; char *user_data; size_t i; call_stack->count = count; - call_elems = CALL_ELEMS_FROM_STACK (call_stack); - user_data = ((char *) call_elems) + ROUND_UP_TO_ALIGNMENT_SIZE (count * sizeof (grpc_call_element)); + call_elems = CALL_ELEMS_FROM_STACK(call_stack); + user_data = ((char *)call_elems) + + ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); /* init per-filter data */ - for (i = 0; i < count; i++) - { - call_elems[i].filter = channel_elems[i].filter; - call_elems[i].channel_data = channel_elems[i].channel_data; - call_elems[i].call_data = user_data; - call_elems[i].filter->init_call_elem (exec_ctx, &call_elems[i], transport_server_data, initial_op); - user_data += ROUND_UP_TO_ALIGNMENT_SIZE (call_elems[i].filter->sizeof_call_data); - } + for (i = 0; i < count; i++) { + call_elems[i].filter = channel_elems[i].filter; + call_elems[i].channel_data = channel_elems[i].channel_data; + call_elems[i].call_data = user_data; + call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], + transport_server_data, initial_op); + user_data += + ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); + } } -void -grpc_call_stack_destroy (grpc_exec_ctx * exec_ctx, grpc_call_stack * stack) -{ - grpc_call_element *elems = CALL_ELEMS_FROM_STACK (stack); +void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) { + grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack); size_t count = stack->count; size_t i; /* destroy per-filter data */ - for (i = 0; i < count; i++) - { - elems[i].filter->destroy_call_elem (exec_ctx, &elems[i]); - } + for (i = 0; i < count; i++) { + elems[i].filter->destroy_call_elem(exec_ctx, &elems[i]); + } } -void -grpc_call_next_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) -{ +void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op *op) { grpc_call_element *next_elem = elem + 1; - next_elem->filter->start_transport_stream_op (exec_ctx, next_elem, op); + next_elem->filter->start_transport_stream_op(exec_ctx, next_elem, op); } -char * -grpc_call_next_get_peer (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) -{ +char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { grpc_call_element *next_elem = elem + 1; - return next_elem->filter->get_peer (exec_ctx, next_elem); + return next_elem->filter->get_peer(exec_ctx, next_elem); } -void -grpc_channel_next_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op) -{ +void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_transport_op *op) { grpc_channel_element *next_elem = elem + 1; - next_elem->filter->start_transport_op (exec_ctx, next_elem, op); + next_elem->filter->start_transport_op(exec_ctx, next_elem, op); } -grpc_channel_stack * -grpc_channel_stack_from_top_element (grpc_channel_element * elem) -{ - return (grpc_channel_stack *) ((char *) (elem) - ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_channel_stack))); +grpc_channel_stack *grpc_channel_stack_from_top_element( + grpc_channel_element *elem) { + return (grpc_channel_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE( + sizeof(grpc_channel_stack))); } -grpc_call_stack * -grpc_call_stack_from_top_element (grpc_call_element * elem) -{ - return (grpc_call_stack *) ((char *) (elem) - ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_call_stack))); +grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { + return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE( + sizeof(grpc_call_stack))); } -void -grpc_call_element_send_cancel (grpc_exec_ctx * exec_ctx, grpc_call_element * cur_elem) -{ +void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, + grpc_call_element *cur_elem) { grpc_transport_stream_op op; - memset (&op, 0, sizeof (op)); + memset(&op, 0, sizeof(op)); op.cancel_with_status = GRPC_STATUS_CANCELLED; - grpc_call_next_op (exec_ctx, cur_elem, &op); + grpc_call_next_op(exec_ctx, cur_elem, &op); } diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 0bfe326976..6732cc3018 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -61,15 +61,17 @@ typedef struct grpc_call_element grpc_call_element; 4. a name, which is useful when debugging Members are laid out in approximate frequency of use order. */ -typedef struct -{ +typedef struct { /* Called to eg. send/receive data on a call. See grpc_call_next_op on how to call the next element in the stack */ - void (*start_transport_stream_op) (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op); + void (*start_transport_stream_op)(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op); /* Called to handle channel level operations - e.g. new calls, or transport closure. See grpc_channel_next_op on how to call the next element in the stack */ - void (*start_transport_op) (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op); + void (*start_transport_op)(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_transport_op *op); /* sizeof(per call data) */ size_t sizeof_call_data; @@ -81,10 +83,12 @@ typedef struct on a client; if it is non-NULL, then it points to memory owned by the transport and is on the server. Most filters want to ignore this argument. */ - void (*init_call_elem) (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op); + void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const void *server_transport_data, + grpc_transport_stream_op *initial_op); /* Destroy per call data. The filter does not need to do any chaining */ - void (*destroy_call_elem) (grpc_exec_ctx * exec_ctx, grpc_call_element * elem); + void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); /* sizeof(per channel data) */ size_t sizeof_channel_data; @@ -94,13 +98,17 @@ typedef struct is_first, is_last designate this elements position in the stack, and are useful for asserting correct configuration by upper layer code. The filter does not need to do any chaining */ - void (*init_channel_elem) (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, int is_first, int is_last); + void (*init_channel_elem)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_channel *master, const grpc_channel_args *args, + grpc_mdctx *metadata_context, int is_first, + int is_last); /* Destroy per channel data. The filter does not need to do any chaining */ - void (*destroy_channel_elem) (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem); + void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem); /* Implement grpc_call_get_peer() */ - char *(*get_peer) (grpc_exec_ctx * exec_ctx, grpc_call_element * elem); + char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); /* The name of this filter */ const char *name; @@ -108,8 +116,7 @@ typedef struct /* A channel_element tracks its filter and the filter requested memory within a channel allocation */ -struct grpc_channel_element -{ +struct grpc_channel_element { const grpc_channel_filter *filter; void *channel_data; }; @@ -117,8 +124,7 @@ struct grpc_channel_element /* A call_element tracks its filter, the filter requested memory within a channel allocation, and the filter requested memory within a call allocation */ -struct grpc_call_element -{ +struct grpc_call_element { const grpc_channel_filter *filter; void *channel_data; void *call_data; @@ -126,8 +132,7 @@ struct grpc_call_element /* A channel stack tracks a set of related filters for one channel, and guarantees they live within a single malloc() allocation */ -typedef struct -{ +typedef struct { size_t count; /* Memory required for a call stack (computed at channel stack initialization) */ @@ -136,48 +141,63 @@ typedef struct /* A call stack tracks a set of related filters for one call, and guarantees they live within a single malloc() allocation */ -typedef struct -{ - size_t count; -} grpc_call_stack; +typedef struct { size_t count; } grpc_call_stack; /* Get a channel element given a channel stack and its index */ -grpc_channel_element *grpc_channel_stack_element (grpc_channel_stack * stack, size_t i); +grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack, + size_t i); /* Get the last channel element in a channel stack */ -grpc_channel_element *grpc_channel_stack_last_element (grpc_channel_stack * stack); +grpc_channel_element *grpc_channel_stack_last_element( + grpc_channel_stack *stack); /* Get a call stack element given a call stack and an index */ -grpc_call_element *grpc_call_stack_element (grpc_call_stack * stack, size_t i); +grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i); /* Determine memory required for a channel stack containing a set of filters */ -size_t grpc_channel_stack_size (const grpc_channel_filter ** filters, size_t filter_count); +size_t grpc_channel_stack_size(const grpc_channel_filter **filters, + size_t filter_count); /* Initialize a channel stack given some filters */ -void grpc_channel_stack_init (grpc_exec_ctx * exec_ctx, const grpc_channel_filter ** filters, size_t filter_count, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, grpc_channel_stack * stack); +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, + const grpc_channel_filter **filters, + size_t filter_count, grpc_channel *master, + const grpc_channel_args *args, + grpc_mdctx *metadata_context, + grpc_channel_stack *stack); /* Destroy a channel stack */ -void grpc_channel_stack_destroy (grpc_exec_ctx * exec_ctx, grpc_channel_stack * stack); +void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *stack); /* Initialize a call stack given a channel stack. transport_server_data is expected to be NULL on a client, or an opaque transport owned pointer on the server. */ -void grpc_call_stack_init (grpc_exec_ctx * exec_ctx, grpc_channel_stack * channel_stack, const void *transport_server_data, grpc_transport_stream_op * initial_op, grpc_call_stack * call_stack); +void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, + const void *transport_server_data, + grpc_transport_stream_op *initial_op, + grpc_call_stack *call_stack); /* Destroy a call stack */ -void grpc_call_stack_destroy (grpc_exec_ctx * exec_ctx, grpc_call_stack * stack); +void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack); /* Call the next operation in a call stack */ -void grpc_call_next_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op); +void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op *op); /* Call the next operation (depending on call directionality) in a channel stack */ -void grpc_channel_next_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op); +void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_transport_op *op); /* Pass through a request to get_peer to the next child element */ -char *grpc_call_next_get_peer (grpc_exec_ctx * exec_ctx, grpc_call_element * elem); +char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); /* Given the top element of a channel stack, get the channel stack itself */ -grpc_channel_stack *grpc_channel_stack_from_top_element (grpc_channel_element * elem); +grpc_channel_stack *grpc_channel_stack_from_top_element( + grpc_channel_element *elem); /* Given the top element of a call stack, get the call stack itself */ -grpc_call_stack *grpc_call_stack_from_top_element (grpc_call_element * elem); +grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem); -void grpc_call_log_op (char *file, int line, gpr_log_severity severity, grpc_call_element * elem, grpc_transport_stream_op * op); +void grpc_call_log_op(char *file, int line, gpr_log_severity severity, + grpc_call_element *elem, grpc_transport_stream_op *op); -void grpc_call_element_send_cancel (grpc_exec_ctx * exec_ctx, grpc_call_element * cur_elem); +void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, + grpc_call_element *cur_elem); extern int grpc_trace_channel; diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 97f097d301..ce03ad5eb0 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -51,8 +51,7 @@ typedef struct call_data call_data; -typedef struct -{ +typedef struct { /** metadata context for this channel */ grpc_mdctx *mdctx; /** resolver for this channel */ @@ -90,16 +89,14 @@ typedef struct to watch for state changes from the lb_policy. When a state change is seen, we update the channel, and create a new watcher */ -typedef struct -{ +typedef struct { channel_data *chand; grpc_closure on_changed; grpc_connectivity_state state; grpc_lb_policy *lb_policy; } lb_policy_connectivity_watcher; -typedef enum -{ +typedef enum { CALL_CREATED, CALL_WAITING_FOR_SEND, CALL_WAITING_FOR_CONFIG, @@ -109,8 +106,7 @@ typedef enum CALL_CANCELLED } call_state; -struct call_data -{ +struct call_data { /* owning element */ grpc_call_element *elem; @@ -127,406 +123,361 @@ struct call_data grpc_linked_mdelem details; }; -static grpc_closure * -merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_op) - GRPC_MUST_USE_RESULT; +static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, + grpc_transport_stream_op *new_op) + GRPC_MUST_USE_RESULT; - static void handle_op_after_cancellation (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) -{ +static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - if (op->send_ops) - { - grpc_stream_ops_unref_owned_objects (op->send_ops->ops, op->send_ops->nops); - op->on_done_send->cb (exec_ctx, op->on_done_send->cb_arg, 0); - } - if (op->recv_ops) - { - char status[GPR_LTOA_MIN_BUFSIZE]; - grpc_metadata_batch mdb; - gpr_ltoa (GRPC_STATUS_CANCELLED, status); - calld->status.md = grpc_mdelem_from_strings (chand->mdctx, "grpc-status", status); - calld->details.md = grpc_mdelem_from_strings (chand->mdctx, "grpc-message", "Cancelled"); - calld->status.prev = calld->details.next = NULL; - calld->status.next = &calld->details; - calld->details.prev = &calld->status; - mdb.list.head = &calld->status; - mdb.list.tail = &calld->details; - mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future (GPR_CLOCK_REALTIME); - grpc_sopb_add_metadata (op->recv_ops, mdb); - *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv->cb (exec_ctx, op->on_done_recv->cb_arg, 1); - } - if (op->on_consumed) - { - op->on_consumed->cb (exec_ctx, op->on_consumed->cb_arg, 0); - } + if (op->send_ops) { + grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); + op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0); + } + if (op->recv_ops) { + char status[GPR_LTOA_MIN_BUFSIZE]; + grpc_metadata_batch mdb; + gpr_ltoa(GRPC_STATUS_CANCELLED, status); + calld->status.md = + grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status); + calld->details.md = + grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled"); + calld->status.prev = calld->details.next = NULL; + calld->status.next = &calld->details; + calld->details.prev = &calld->status; + mdb.list.head = &calld->status; + mdb.list.tail = &calld->details; + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + grpc_sopb_add_metadata(op->recv_ops, mdb); + *op->recv_state = GRPC_STREAM_CLOSED; + op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1); + } + if (op->on_consumed) { + op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0); + } } -typedef struct -{ +typedef struct { grpc_closure closure; grpc_call_element *elem; } waiting_call; -static void perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op, int continuation); +static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op, + int continuation); -static void -continue_with_pick (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void continue_with_pick(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { waiting_call *wc = arg; call_data *calld = wc->elem->call_data; - perform_transport_stream_op (exec_ctx, wc->elem, &calld->waiting_op, 1); - gpr_free (wc); + perform_transport_stream_op(exec_ctx, wc->elem, &calld->waiting_op, 1); + gpr_free(wc); } -static void -add_to_lb_policy_wait_queue_locked_state_config (grpc_call_element * elem) -{ +static void add_to_lb_policy_wait_queue_locked_state_config( + grpc_call_element *elem) { channel_data *chand = elem->channel_data; - waiting_call *wc = gpr_malloc (sizeof (*wc)); - grpc_closure_init (&wc->closure, continue_with_pick, wc); + waiting_call *wc = gpr_malloc(sizeof(*wc)); + grpc_closure_init(&wc->closure, continue_with_pick, wc); wc->elem = elem; - grpc_closure_list_add (&chand->waiting_for_config_closures, &wc->closure, 1); + grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1); } -static int -is_empty (void *p, int len) -{ +static int is_empty(void *p, int len) { char *ptr = p; int i; - for (i = 0; i < len; i++) - { - if (ptr[i] != 0) - return 0; - } + for (i = 0; i < len; i++) { + if (ptr[i] != 0) return 0; + } return 1; } -static void -started_call (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void started_call(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { call_data *calld = arg; grpc_transport_stream_op op; int have_waiting; - gpr_mu_lock (&calld->mu_state); - if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) - { - memset (&op, 0, sizeof (op)); - op.cancel_with_status = GRPC_STATUS_CANCELLED; - gpr_mu_unlock (&calld->mu_state); - grpc_subchannel_call_process_op (exec_ctx, calld->subchannel_call, &op); - } - else if (calld->state == CALL_WAITING_FOR_CALL) - { - have_waiting = !is_empty (&calld->waiting_op, sizeof (calld->waiting_op)); - if (calld->subchannel_call != NULL) - { - calld->state = CALL_ACTIVE; - gpr_mu_unlock (&calld->mu_state); - if (have_waiting) - { - grpc_subchannel_call_process_op (exec_ctx, calld->subchannel_call, &calld->waiting_op); - } - } - else - { - calld->state = CALL_CANCELLED; - gpr_mu_unlock (&calld->mu_state); - if (have_waiting) - { - handle_op_after_cancellation (exec_ctx, calld->elem, &calld->waiting_op); - } - } - } - else - { - GPR_ASSERT (calld->state == CALL_CANCELLED); - gpr_mu_unlock (&calld->mu_state); - } + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) { + memset(&op, 0, sizeof(op)); + op.cancel_with_status = GRPC_STATUS_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op); + } else if (calld->state == CALL_WAITING_FOR_CALL) { + have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); + if (calld->subchannel_call != NULL) { + calld->state = CALL_ACTIVE; + gpr_mu_unlock(&calld->mu_state); + if (have_waiting) { + grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, + &calld->waiting_op); + } + } else { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + if (have_waiting) { + handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); + } + } + } else { + GPR_ASSERT(calld->state == CALL_CANCELLED); + gpr_mu_unlock(&calld->mu_state); + } } -static void -picked_target (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void picked_target(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { call_data *calld = arg; grpc_pollset *pollset; - if (calld->picked_channel == NULL) - { - /* treat this like a cancellation */ - calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE; - perform_transport_stream_op (exec_ctx, calld->elem, &calld->waiting_op, 1); - } - else - { - gpr_mu_lock (&calld->mu_state); - if (calld->state == CALL_CANCELLED) - { - gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (exec_ctx, calld->elem, &calld->waiting_op); - } - else - { - GPR_ASSERT (calld->state == CALL_WAITING_FOR_PICK); - calld->state = CALL_WAITING_FOR_CALL; - pollset = calld->waiting_op.bind_pollset; - gpr_mu_unlock (&calld->mu_state); - grpc_closure_init (&calld->async_setup_task, started_call, calld); - grpc_subchannel_create_call (exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call, &calld->async_setup_task); - } - } + if (calld->picked_channel == NULL) { + /* treat this like a cancellation */ + calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE; + perform_transport_stream_op(exec_ctx, calld->elem, &calld->waiting_op, 1); + } else { + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_CANCELLED) { + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); + } else { + GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK); + calld->state = CALL_WAITING_FOR_CALL; + pollset = calld->waiting_op.bind_pollset; + gpr_mu_unlock(&calld->mu_state); + grpc_closure_init(&calld->async_setup_task, started_call, calld); + grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset, + &calld->subchannel_call, + &calld->async_setup_task); + } + } } -static grpc_closure * -merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_op) -{ +static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, + grpc_transport_stream_op *new_op) { call_data *calld = elem->call_data; grpc_closure *consumed_op = NULL; grpc_transport_stream_op *waiting_op = &calld->waiting_op; - GPR_ASSERT ((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1); - GPR_ASSERT ((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1); - if (new_op->send_ops != NULL) - { - waiting_op->send_ops = new_op->send_ops; - waiting_op->is_last_send = new_op->is_last_send; - waiting_op->on_done_send = new_op->on_done_send; - } - if (new_op->recv_ops != NULL) - { - waiting_op->recv_ops = new_op->recv_ops; - waiting_op->recv_state = new_op->recv_state; - waiting_op->on_done_recv = new_op->on_done_recv; - } - if (new_op->on_consumed != NULL) - { - if (waiting_op->on_consumed != NULL) - { - consumed_op = waiting_op->on_consumed; - } - waiting_op->on_consumed = new_op->on_consumed; - } - if (new_op->cancel_with_status != GRPC_STATUS_OK) - { - waiting_op->cancel_with_status = new_op->cancel_with_status; - } + GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1); + GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1); + if (new_op->send_ops != NULL) { + waiting_op->send_ops = new_op->send_ops; + waiting_op->is_last_send = new_op->is_last_send; + waiting_op->on_done_send = new_op->on_done_send; + } + if (new_op->recv_ops != NULL) { + waiting_op->recv_ops = new_op->recv_ops; + waiting_op->recv_state = new_op->recv_state; + waiting_op->on_done_recv = new_op->on_done_recv; + } + if (new_op->on_consumed != NULL) { + if (waiting_op->on_consumed != NULL) { + consumed_op = waiting_op->on_consumed; + } + waiting_op->on_consumed = new_op->on_consumed; + } + if (new_op->cancel_with_status != GRPC_STATUS_OK) { + waiting_op->cancel_with_status = new_op->cancel_with_status; + } return consumed_op; } -static char * -cc_get_peer (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) -{ +static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; char *result; - gpr_mu_lock (&calld->mu_state); - if (calld->state == CALL_ACTIVE) - { - subchannel_call = calld->subchannel_call; - GRPC_SUBCHANNEL_CALL_REF (subchannel_call, "get_peer"); - gpr_mu_unlock (&calld->mu_state); - result = grpc_subchannel_call_get_peer (exec_ctx, subchannel_call); - GRPC_SUBCHANNEL_CALL_UNREF (exec_ctx, subchannel_call, "get_peer"); - return result; - } - else - { - gpr_mu_unlock (&calld->mu_state); - return grpc_channel_get_target (chand->master); - } + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_ACTIVE) { + subchannel_call = calld->subchannel_call; + GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer"); + gpr_mu_unlock(&calld->mu_state); + result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer"); + return result; + } else { + gpr_mu_unlock(&calld->mu_state); + return grpc_channel_get_target(chand->master); + } } -static void -perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op, int continuation) -{ +static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op, + int continuation) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; grpc_transport_stream_op op2; - GPR_ASSERT (elem->filter == &grpc_client_channel_filter); - GRPC_CALL_LOG_OP (GPR_INFO, elem, op); + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - gpr_mu_lock (&calld->mu_state); - switch (calld->state) - { + gpr_mu_lock(&calld->mu_state); + switch (calld->state) { case CALL_ACTIVE: - GPR_ASSERT (!continuation); + GPR_ASSERT(!continuation); subchannel_call = calld->subchannel_call; - gpr_mu_unlock (&calld->mu_state); - grpc_subchannel_call_process_op (exec_ctx, subchannel_call, op); + gpr_mu_unlock(&calld->mu_state); + grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op); break; case CALL_CANCELLED: - gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (exec_ctx, elem, op); + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(exec_ctx, elem, op); break; case CALL_WAITING_FOR_SEND: - GPR_ASSERT (!continuation); - grpc_exec_ctx_enqueue (exec_ctx, merge_into_waiting_op (elem, op), 1); - if (!calld->waiting_op.send_ops && calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) - { - gpr_mu_unlock (&calld->mu_state); - break; - } + GPR_ASSERT(!continuation); + grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1); + if (!calld->waiting_op.send_ops && + calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) { + gpr_mu_unlock(&calld->mu_state); + break; + } *op = calld->waiting_op; - memset (&calld->waiting_op, 0, sizeof (calld->waiting_op)); + memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); continuation = 1; - /* fall through */ + /* fall through */ case CALL_WAITING_FOR_CONFIG: case CALL_WAITING_FOR_PICK: case CALL_WAITING_FOR_CALL: - if (!continuation) - { - if (op->cancel_with_status != GRPC_STATUS_OK) - { - calld->state = CALL_CANCELLED; - op2 = calld->waiting_op; - memset (&calld->waiting_op, 0, sizeof (calld->waiting_op)); - if (op->on_consumed) - { - calld->waiting_op.on_consumed = op->on_consumed; - op->on_consumed = NULL; - } - else if (op2.on_consumed) - { - calld->waiting_op.on_consumed = op2.on_consumed; - op2.on_consumed = NULL; - } - gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (exec_ctx, elem, op); - handle_op_after_cancellation (exec_ctx, elem, &op2); - } - else - { - grpc_exec_ctx_enqueue (exec_ctx, merge_into_waiting_op (elem, op), 1); - gpr_mu_unlock (&calld->mu_state); - } - break; - } - /* fall through */ + if (!continuation) { + if (op->cancel_with_status != GRPC_STATUS_OK) { + calld->state = CALL_CANCELLED; + op2 = calld->waiting_op; + memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); + if (op->on_consumed) { + calld->waiting_op.on_consumed = op->on_consumed; + op->on_consumed = NULL; + } else if (op2.on_consumed) { + calld->waiting_op.on_consumed = op2.on_consumed; + op2.on_consumed = NULL; + } + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(exec_ctx, elem, op); + handle_op_after_cancellation(exec_ctx, elem, &op2); + } else { + grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1); + gpr_mu_unlock(&calld->mu_state); + } + break; + } + /* fall through */ case CALL_CREATED: - if (op->cancel_with_status != GRPC_STATUS_OK) - { - calld->state = CALL_CANCELLED; - gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (exec_ctx, elem, op); - } - else - { - calld->waiting_op = *op; - - if (op->send_ops == NULL) - { - /* need to have some send ops before we can select the - lb target */ - calld->state = CALL_WAITING_FOR_SEND; - gpr_mu_unlock (&calld->mu_state); - } - else - { - gpr_mu_lock (&chand->mu_config); - lb_policy = chand->lb_policy; - if (lb_policy) - { - grpc_transport_stream_op *op = &calld->waiting_op; - grpc_pollset *bind_pollset = op->bind_pollset; - grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata; - GRPC_LB_POLICY_REF (lb_policy, "pick"); - gpr_mu_unlock (&chand->mu_config); - calld->state = CALL_WAITING_FOR_PICK; - - GPR_ASSERT (op->bind_pollset); - GPR_ASSERT (op->send_ops); - GPR_ASSERT (op->send_ops->nops >= 1); - GPR_ASSERT (op->send_ops->ops[0].type == GRPC_OP_METADATA); - gpr_mu_unlock (&calld->mu_state); - - grpc_closure_init (&calld->async_setup_task, picked_target, calld); - grpc_lb_policy_pick (exec_ctx, lb_policy, bind_pollset, initial_metadata, &calld->picked_channel, &calld->async_setup_task); - - GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "pick"); - } - else if (chand->resolver != NULL) - { - calld->state = CALL_WAITING_FOR_CONFIG; - add_to_lb_policy_wait_queue_locked_state_config (elem); - if (!chand->started_resolving && chand->resolver != NULL) - { - GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); - chand->started_resolving = 1; - grpc_resolver_next (exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); - } - gpr_mu_unlock (&chand->mu_config); - gpr_mu_unlock (&calld->mu_state); - } - else - { - calld->state = CALL_CANCELLED; - gpr_mu_unlock (&chand->mu_config); - gpr_mu_unlock (&calld->mu_state); - handle_op_after_cancellation (exec_ctx, elem, op); - } - } - } + if (op->cancel_with_status != GRPC_STATUS_OK) { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(exec_ctx, elem, op); + } else { + calld->waiting_op = *op; + + if (op->send_ops == NULL) { + /* need to have some send ops before we can select the + lb target */ + calld->state = CALL_WAITING_FOR_SEND; + gpr_mu_unlock(&calld->mu_state); + } else { + gpr_mu_lock(&chand->mu_config); + lb_policy = chand->lb_policy; + if (lb_policy) { + grpc_transport_stream_op *op = &calld->waiting_op; + grpc_pollset *bind_pollset = op->bind_pollset; + grpc_metadata_batch *initial_metadata = + &op->send_ops->ops[0].data.metadata; + GRPC_LB_POLICY_REF(lb_policy, "pick"); + gpr_mu_unlock(&chand->mu_config); + calld->state = CALL_WAITING_FOR_PICK; + + GPR_ASSERT(op->bind_pollset); + GPR_ASSERT(op->send_ops); + GPR_ASSERT(op->send_ops->nops >= 1); + GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA); + gpr_mu_unlock(&calld->mu_state); + + grpc_closure_init(&calld->async_setup_task, picked_target, calld); + grpc_lb_policy_pick(exec_ctx, lb_policy, bind_pollset, + initial_metadata, &calld->picked_channel, + &calld->async_setup_task); + + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick"); + } else if (chand->resolver != NULL) { + calld->state = CALL_WAITING_FOR_CONFIG; + add_to_lb_policy_wait_queue_locked_state_config(elem); + if (!chand->started_resolving && chand->resolver != NULL) { + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + chand->started_resolving = 1; + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_configuration, + &chand->on_config_changed); + } + gpr_mu_unlock(&chand->mu_config); + gpr_mu_unlock(&calld->mu_state); + } else { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&chand->mu_config); + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(exec_ctx, elem, op); + } + } + } break; - } + } } -static void -cc_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) -{ - perform_transport_stream_op (exec_ctx, elem, op, 0); +static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + perform_transport_stream_op(exec_ctx, elem, op, 0); } -static void watch_lb_policy (grpc_exec_ctx * exec_ctx, channel_data * chand, grpc_lb_policy * lb_policy, grpc_connectivity_state current_state); +static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, + grpc_lb_policy *lb_policy, + grpc_connectivity_state current_state); -static void -on_lb_policy_state_changed_locked (grpc_exec_ctx * exec_ctx, lb_policy_connectivity_watcher * w) -{ +static void on_lb_policy_state_changed_locked( + grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) { /* check if the notification is for a stale policy */ - if (w->lb_policy != w->chand->lb_policy) - return; + if (w->lb_policy != w->chand->lb_policy) return; - grpc_connectivity_state_set (exec_ctx, &w->chand->state_tracker, w->state, "lb_changed"); - if (w->state != GRPC_CHANNEL_FATAL_FAILURE) - { - watch_lb_policy (exec_ctx, w->chand, w->lb_policy, w->state); - } + grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state, + "lb_changed"); + if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { + watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + } } -static void -on_lb_policy_state_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { lb_policy_connectivity_watcher *w = arg; - gpr_mu_lock (&w->chand->mu_config); - on_lb_policy_state_changed_locked (exec_ctx, w); - gpr_mu_unlock (&w->chand->mu_config); + gpr_mu_lock(&w->chand->mu_config); + on_lb_policy_state_changed_locked(exec_ctx, w); + gpr_mu_unlock(&w->chand->mu_config); - GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, w->chand->master, "watch_lb_policy"); - gpr_free (w); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy"); + gpr_free(w); } -static void -watch_lb_policy (grpc_exec_ctx * exec_ctx, channel_data * chand, grpc_lb_policy * lb_policy, grpc_connectivity_state current_state) -{ - lb_policy_connectivity_watcher *w = gpr_malloc (sizeof (*w)); - GRPC_CHANNEL_INTERNAL_REF (chand->master, "watch_lb_policy"); +static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, + grpc_lb_policy *lb_policy, + grpc_connectivity_state current_state) { + lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); w->chand = chand; - grpc_closure_init (&w->on_changed, on_lb_policy_state_changed, w); + grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w); w->state = current_state; w->lb_policy = lb_policy; - grpc_lb_policy_notify_on_state_change (exec_ctx, lb_policy, &w->state, &w->on_changed); + grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, + &w->on_changed); } -static void -cc_on_config_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; @@ -534,313 +485,295 @@ cc_on_config_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; int exit_idle = 0; - if (chand->incoming_configuration != NULL) - { - lb_policy = grpc_client_config_get_lb_policy (chand->incoming_configuration); - if (lb_policy != NULL) - { - GRPC_LB_POLICY_REF (lb_policy, "channel"); - GRPC_LB_POLICY_REF (lb_policy, "config_change"); - state = grpc_lb_policy_check_connectivity (exec_ctx, lb_policy); - } - - grpc_client_config_unref (exec_ctx, chand->incoming_configuration); + if (chand->incoming_configuration != NULL) { + lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); + if (lb_policy != NULL) { + GRPC_LB_POLICY_REF(lb_policy, "channel"); + GRPC_LB_POLICY_REF(lb_policy, "config_change"); + state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy); } + grpc_client_config_unref(exec_ctx, chand->incoming_configuration); + } + chand->incoming_configuration = NULL; - gpr_mu_lock (&chand->mu_config); + gpr_mu_lock(&chand->mu_config); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; - if (lb_policy != NULL || chand->resolver == NULL /* disconnected */ ) - { - grpc_exec_ctx_enqueue_list (exec_ctx, &chand->waiting_for_config_closures); - } - if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) - { - GRPC_LB_POLICY_REF (lb_policy, "exit_idle"); - exit_idle = 1; - chand->exit_idle_when_lb_policy_arrives = 0; - } - - if (iomgr_success && chand->resolver) - { - grpc_resolver *resolver = chand->resolver; - GRPC_RESOLVER_REF (resolver, "channel-next"); - grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, state, "new_lb+resolver"); - if (lb_policy != NULL) - { - watch_lb_policy (exec_ctx, chand, lb_policy, state); - } - gpr_mu_unlock (&chand->mu_config); - GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); - grpc_resolver_next (exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); - GRPC_RESOLVER_UNREF (exec_ctx, resolver, "channel-next"); - } - else - { - old_resolver = chand->resolver; - chand->resolver = NULL; - grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); - gpr_mu_unlock (&chand->mu_config); - if (old_resolver != NULL) - { - grpc_resolver_shutdown (exec_ctx, old_resolver); - GRPC_RESOLVER_UNREF (exec_ctx, old_resolver, "channel"); - } - } - - if (exit_idle) - { - grpc_lb_policy_exit_idle (exec_ctx, lb_policy); - GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "exit_idle"); - } - - if (old_lb_policy != NULL) - { - grpc_lb_policy_shutdown (exec_ctx, old_lb_policy); - GRPC_LB_POLICY_UNREF (exec_ctx, old_lb_policy, "channel"); - } - - if (lb_policy != NULL) - { - GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "config_change"); - } - - GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, chand->master, "resolver"); + if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { + grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures); + } + if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { + GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); + exit_idle = 1; + chand->exit_idle_when_lb_policy_arrives = 0; + } + + if (iomgr_success && chand->resolver) { + grpc_resolver *resolver = chand->resolver; + GRPC_RESOLVER_REF(resolver, "channel-next"); + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, + "new_lb+resolver"); + if (lb_policy != NULL) { + watch_lb_policy(exec_ctx, chand, lb_policy, state); + } + gpr_mu_unlock(&chand->mu_config); + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, + &chand->on_config_changed); + GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next"); + } else { + old_resolver = chand->resolver; + chand->resolver = NULL; + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); + gpr_mu_unlock(&chand->mu_config); + if (old_resolver != NULL) { + grpc_resolver_shutdown(exec_ctx, old_resolver); + GRPC_RESOLVER_UNREF(exec_ctx, old_resolver, "channel"); + } + } + + if (exit_idle) { + grpc_lb_policy_exit_idle(exec_ctx, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle"); + } + + if (old_lb_policy != NULL) { + grpc_lb_policy_shutdown(exec_ctx, old_lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); + } + + if (lb_policy != NULL) { + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change"); + } + + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver"); } -static void -cc_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op) -{ +static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { grpc_lb_policy *lb_policy = NULL; channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; - grpc_exec_ctx_enqueue (exec_ctx, op->on_consumed, 1); - - GPR_ASSERT (op->set_accept_stream == NULL); - GPR_ASSERT (op->bind_pollset == NULL); - - gpr_mu_lock (&chand->mu_config); - if (op->on_connectivity_state_change != NULL) - { - grpc_connectivity_state_notify_on_state_change (exec_ctx, &chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change); - op->on_connectivity_state_change = NULL; - op->connectivity_state = NULL; - } - - if (!is_empty (op, sizeof (*op))) - { - lb_policy = chand->lb_policy; - if (lb_policy) - { - GRPC_LB_POLICY_REF (lb_policy, "broadcast"); - } - } - - if (op->disconnect && chand->resolver != NULL) - { - grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); - destroy_resolver = chand->resolver; - chand->resolver = NULL; - if (chand->lb_policy != NULL) - { - grpc_lb_policy_shutdown (exec_ctx, chand->lb_policy); - GRPC_LB_POLICY_UNREF (exec_ctx, chand->lb_policy, "channel"); - chand->lb_policy = NULL; - } - } - gpr_mu_unlock (&chand->mu_config); - - if (destroy_resolver) - { - grpc_resolver_shutdown (exec_ctx, destroy_resolver); - GRPC_RESOLVER_UNREF (exec_ctx, destroy_resolver, "channel"); - } - - if (lb_policy) - { - grpc_lb_policy_broadcast (exec_ctx, lb_policy, op); - GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "broadcast"); - } + grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); + + GPR_ASSERT(op->set_accept_stream == NULL); + GPR_ASSERT(op->bind_pollset == NULL); + + gpr_mu_lock(&chand->mu_config); + if (op->on_connectivity_state_change != NULL) { + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &chand->state_tracker, op->connectivity_state, + op->on_connectivity_state_change); + op->on_connectivity_state_change = NULL; + op->connectivity_state = NULL; + } + + if (!is_empty(op, sizeof(*op))) { + lb_policy = chand->lb_policy; + if (lb_policy) { + GRPC_LB_POLICY_REF(lb_policy, "broadcast"); + } + } + + if (op->disconnect && chand->resolver != NULL) { + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); + destroy_resolver = chand->resolver; + chand->resolver = NULL; + if (chand->lb_policy != NULL) { + grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); + chand->lb_policy = NULL; + } + } + gpr_mu_unlock(&chand->mu_config); + + if (destroy_resolver) { + grpc_resolver_shutdown(exec_ctx, destroy_resolver); + GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel"); + } + + if (lb_policy) { + grpc_lb_policy_broadcast(exec_ctx, lb_policy, op); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast"); + } } /* Constructor for call_data */ -static void -init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op) -{ +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const void *server_transport_data, + grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; /* TODO(ctiller): is there something useful we can do here? */ - GPR_ASSERT (initial_op == NULL); + GPR_ASSERT(initial_op == NULL); - GPR_ASSERT (elem->filter == &grpc_client_channel_filter); - GPR_ASSERT (server_transport_data == NULL); - gpr_mu_init (&calld->mu_state); + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + GPR_ASSERT(server_transport_data == NULL); + gpr_mu_init(&calld->mu_state); calld->elem = elem; calld->state = CALL_CREATED; - calld->deadline = gpr_inf_future (GPR_CLOCK_REALTIME); + calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } /* Destructor for call_data */ -static void -destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) -{ +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { call_data *calld = elem->call_data; grpc_subchannel_call *subchannel_call; /* if the call got activated, we need to destroy the child stack also, and remove it from the in-flight requests tracked by the child_entry we picked */ - gpr_mu_lock (&calld->mu_state); - switch (calld->state) - { + gpr_mu_lock(&calld->mu_state); + switch (calld->state) { case CALL_ACTIVE: subchannel_call = calld->subchannel_call; - gpr_mu_unlock (&calld->mu_state); - GRPC_SUBCHANNEL_CALL_UNREF (exec_ctx, subchannel_call, "client_channel"); + gpr_mu_unlock(&calld->mu_state); + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_channel"); break; case CALL_CREATED: case CALL_CANCELLED: - gpr_mu_unlock (&calld->mu_state); + gpr_mu_unlock(&calld->mu_state); break; case CALL_WAITING_FOR_PICK: case CALL_WAITING_FOR_CONFIG: case CALL_WAITING_FOR_CALL: case CALL_WAITING_FOR_SEND: - gpr_log (GPR_ERROR, "should never reach here"); - abort (); + gpr_log(GPR_ERROR, "should never reach here"); + abort(); break; - } + } } /* Constructor for channel_data */ -static void -init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, int is_first, int is_last) -{ +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_channel *master, + const grpc_channel_args *args, + grpc_mdctx *metadata_context, int is_first, + int is_last) { channel_data *chand = elem->channel_data; - memset (chand, 0, sizeof (*chand)); + memset(chand, 0, sizeof(*chand)); - GPR_ASSERT (is_last); - GPR_ASSERT (elem->filter == &grpc_client_channel_filter); + GPR_ASSERT(is_last); + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - gpr_mu_init (&chand->mu_config); + gpr_mu_init(&chand->mu_config); chand->mdctx = metadata_context; chand->master = master; - grpc_pollset_set_init (&chand->pollset_set); - grpc_closure_init (&chand->on_config_changed, cc_on_config_changed, chand); + grpc_pollset_set_init(&chand->pollset_set); + grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); - grpc_connectivity_state_init (&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); + grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, + "client_channel"); } /* Destructor for channel_data */ -static void -destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem) -{ +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - if (chand->resolver != NULL) - { - grpc_resolver_shutdown (exec_ctx, chand->resolver); - GRPC_RESOLVER_UNREF (exec_ctx, chand->resolver, "channel"); - } - if (chand->lb_policy != NULL) - { - GRPC_LB_POLICY_UNREF (exec_ctx, chand->lb_policy, "channel"); - } - grpc_connectivity_state_destroy (exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy (&chand->pollset_set); - gpr_mu_destroy (&chand->mu_config); + if (chand->resolver != NULL) { + grpc_resolver_shutdown(exec_ctx, chand->resolver); + GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + } + if (chand->lb_policy != NULL) { + GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); + } + grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); + grpc_pollset_set_destroy(&chand->pollset_set); + gpr_mu_destroy(&chand->mu_config); } const grpc_channel_filter grpc_client_channel_filter = { - cc_start_transport_stream_op, - cc_start_transport_op, - sizeof (call_data), - init_call_elem, - destroy_call_elem, - sizeof (channel_data), - init_channel_elem, - destroy_channel_elem, - cc_get_peer, - "client-channel", + cc_start_transport_stream_op, + cc_start_transport_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + cc_get_peer, + "client-channel", }; -void -grpc_client_channel_set_resolver (grpc_exec_ctx * exec_ctx, grpc_channel_stack * channel_stack, grpc_resolver * resolver) -{ +void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, + grpc_resolver *resolver) { /* post construction initialization: set the transport setup pointer */ - grpc_channel_element *elem = grpc_channel_stack_last_element (channel_stack); + grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *chand = elem->channel_data; - gpr_mu_lock (&chand->mu_config); - GPR_ASSERT (!chand->resolver); + gpr_mu_lock(&chand->mu_config); + GPR_ASSERT(!chand->resolver); chand->resolver = resolver; - GRPC_RESOLVER_REF (resolver, "channel"); - if (!grpc_closure_list_empty (chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives) - { - chand->started_resolving = 1; - GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); - grpc_resolver_next (exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); - } - gpr_mu_unlock (&chand->mu_config); + GRPC_RESOLVER_REF(resolver, "channel"); + if (!grpc_closure_list_empty(chand->waiting_for_config_closures) || + chand->exit_idle_when_lb_policy_arrives) { + chand->started_resolving = 1; + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, + &chand->on_config_changed); + } + gpr_mu_unlock(&chand->mu_config); } -grpc_connectivity_state -grpc_client_channel_check_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, int try_to_connect) -{ +grpc_connectivity_state grpc_client_channel_check_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { channel_data *chand = elem->channel_data; grpc_connectivity_state out; - gpr_mu_lock (&chand->mu_config); - out = grpc_connectivity_state_check (&chand->state_tracker); - if (out == GRPC_CHANNEL_IDLE && try_to_connect) - { - if (chand->lb_policy != NULL) - { - grpc_lb_policy_exit_idle (exec_ctx, chand->lb_policy); - } - else - { - chand->exit_idle_when_lb_policy_arrives = 1; - if (!chand->started_resolving && chand->resolver != NULL) - { - GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); - chand->started_resolving = 1; - grpc_resolver_next (exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); - } - } - } - gpr_mu_unlock (&chand->mu_config); + gpr_mu_lock(&chand->mu_config); + out = grpc_connectivity_state_check(&chand->state_tracker); + if (out == GRPC_CHANNEL_IDLE && try_to_connect) { + if (chand->lb_policy != NULL) { + grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); + } else { + chand->exit_idle_when_lb_policy_arrives = 1; + if (!chand->started_resolving && chand->resolver != NULL) { + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + chand->started_resolving = 1; + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_configuration, + &chand->on_config_changed); + } + } + } + gpr_mu_unlock(&chand->mu_config); return out; } -void -grpc_client_channel_watch_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_connectivity_state * state, grpc_closure * on_complete) -{ +void grpc_client_channel_watch_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_connectivity_state *state, grpc_closure *on_complete) { channel_data *chand = elem->channel_data; - gpr_mu_lock (&chand->mu_config); - grpc_connectivity_state_notify_on_state_change (exec_ctx, &chand->state_tracker, state, on_complete); - gpr_mu_unlock (&chand->mu_config); + gpr_mu_lock(&chand->mu_config); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &chand->state_tracker, state, on_complete); + gpr_mu_unlock(&chand->mu_config); } -grpc_pollset_set * -grpc_client_channel_get_connecting_pollset_set (grpc_channel_element * elem) -{ +grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( + grpc_channel_element *elem) { channel_data *chand = elem->channel_data; return &chand->pollset_set; } -void -grpc_client_channel_add_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_pollset * pollset) -{ +void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_pollset *pollset) { channel_data *chand = elem->channel_data; - grpc_pollset_set_add_pollset (exec_ctx, &chand->pollset_set, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset); } -void -grpc_client_channel_del_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_pollset * pollset) -{ +void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_pollset *pollset) { channel_data *chand = elem->channel_data; - grpc_pollset_set_del_pollset (exec_ctx, &chand->pollset_set, pollset); + grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset); } diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index 5af9794727..5103f07a43 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -49,15 +49,25 @@ extern const grpc_channel_filter grpc_client_channel_filter; /* post-construction initializer to let the client channel know which transport setup it should cancel upon destruction, or initiate when it needs a connection */ -void grpc_client_channel_set_resolver (grpc_exec_ctx * exec_ctx, grpc_channel_stack * channel_stack, grpc_resolver * resolver); +void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, + grpc_resolver *resolver); -grpc_connectivity_state grpc_client_channel_check_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, int try_to_connect); +grpc_connectivity_state grpc_client_channel_check_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); -void grpc_client_channel_watch_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_connectivity_state * state, grpc_closure * on_complete); +void grpc_client_channel_watch_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_connectivity_state *state, grpc_closure *on_complete); -grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set (grpc_channel_element * elem); +grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( + grpc_channel_element *elem); -void grpc_client_channel_add_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * channel, grpc_pollset * pollset); -void grpc_client_channel_del_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * channel, grpc_pollset * pollset); +void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx, + grpc_channel_element *channel, + grpc_pollset *pollset); +void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx, + grpc_channel_element *channel, + grpc_pollset *pollset); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 6acdc72075..f8dbe8c817 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -44,14 +44,13 @@ #include "src/core/compression/message_compress.h" #include "src/core/support/string.h" -typedef struct call_data -{ +typedef struct call_data { gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; - gpr_uint32 remaining_slice_bytes; - /**< Input data to be read, as per BEGIN_MESSAGE */ - int written_initial_metadata; /**< Already processed initial md? */ + gpr_uint32 remaining_slice_bytes; + /**< Input data to be read, as per BEGIN_MESSAGE */ + int written_initial_metadata; /**< Already processed initial md? */ /** Compression algorithm we'll try to use. It may be given by incoming * metadata, or by the channel's default compression settings. */ grpc_compression_algorithm compression_algorithm; @@ -59,8 +58,7 @@ typedef struct call_data int has_compression_algorithm; } call_data; -typedef struct channel_data -{ +typedef struct channel_data { /** Metadata key for the incoming (requested) compression algorithm */ grpc_mdstr *mdstr_request_compression_algorithm_key; /** Metadata key for the outgoing (used) compression algorithm */ @@ -82,62 +80,59 @@ typedef struct channel_data * larger than the raw input). * * Returns 1 if the data was actually compress and 0 otherwise. */ -static int -compress_send_sb (grpc_compression_algorithm algorithm, gpr_slice_buffer * slices) -{ +static int compress_send_sb(grpc_compression_algorithm algorithm, + gpr_slice_buffer *slices) { int did_compress; gpr_slice_buffer tmp; - gpr_slice_buffer_init (&tmp); - did_compress = grpc_msg_compress (algorithm, slices, &tmp); - if (did_compress) - { - gpr_slice_buffer_swap (slices, &tmp); - } - gpr_slice_buffer_destroy (&tmp); + gpr_slice_buffer_init(&tmp); + did_compress = grpc_msg_compress(algorithm, slices, &tmp); + if (did_compress) { + gpr_slice_buffer_swap(slices, &tmp); + } + gpr_slice_buffer_destroy(&tmp); return did_compress; } /** For each \a md element from the incoming metadata, filter out the entry for * "grpc-encoding", using its value to populate the call data's * compression_algorithm field. */ -static grpc_mdelem * -compression_md_filter (void *user_data, grpc_mdelem * md) -{ +static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; - if (md->key == channeld->mdstr_request_compression_algorithm_key) - { - const char *md_c_str = grpc_mdstr_as_c_string (md->value); - if (!grpc_compression_algorithm_parse (md_c_str, strlen (md_c_str), &calld->compression_algorithm)) - { - gpr_log (GPR_ERROR, "Invalid compression algorithm: '%s' (unknown). Ignoring.", md_c_str); - calld->compression_algorithm = GRPC_COMPRESS_NONE; - } - if (grpc_compression_options_is_algorithm_enabled (&channeld->compression_options, calld->compression_algorithm) == 0) - { - gpr_log (GPR_ERROR, "Invalid compression algorithm: '%s' (previously disabled). " "Ignoring.", md_c_str); - calld->compression_algorithm = GRPC_COMPRESS_NONE; - } - calld->has_compression_algorithm = 1; - return NULL; + if (md->key == channeld->mdstr_request_compression_algorithm_key) { + const char *md_c_str = grpc_mdstr_as_c_string(md->value); + if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), + &calld->compression_algorithm)) { + gpr_log(GPR_ERROR, + "Invalid compression algorithm: '%s' (unknown). Ignoring.", + md_c_str); + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } + if (grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, calld->compression_algorithm) == + 0) { + gpr_log(GPR_ERROR, + "Invalid compression algorithm: '%s' (previously disabled). " + "Ignoring.", + md_c_str); + calld->compression_algorithm = GRPC_COMPRESS_NONE; } + calld->has_compression_algorithm = 1; + return NULL; + } return md; } -static int -skip_compression (channel_data * channeld, call_data * calld) -{ - if (calld->has_compression_algorithm) - { - if (calld->compression_algorithm == GRPC_COMPRESS_NONE) - { - return 1; - } - return 0; /* we have an actual call-specific algorithm */ +static int skip_compression(channel_data *channeld, call_data *calld) { + if (calld->has_compression_algorithm) { + if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { + return 1; } + return 0; /* we have an actual call-specific algorithm */ + } /* no per-call compression override */ return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; } @@ -146,127 +141,126 @@ skip_compression (channel_data * channeld, call_data * calld) * the associated GRPC_OP_BEGIN_MESSAGE accordingly (new compressed length, * flags indicating compression is in effect) and replaces \a send_ops with it. * */ -static void -finish_compressed_sopb (grpc_stream_op_buffer * send_ops, grpc_call_element * elem) -{ +static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops, + grpc_call_element *elem) { size_t i; call_data *calld = elem->call_data; - int new_slices_added = 0; /* GPR_FALSE */ + int new_slices_added = 0; /* GPR_FALSE */ grpc_metadata_batch metadata; grpc_stream_op_buffer new_send_ops; - grpc_sopb_init (&new_send_ops); - - for (i = 0; i < send_ops->nops; i++) - { - grpc_stream_op *sop = &send_ops->ops[i]; - switch (sop->type) - { - case GRPC_OP_BEGIN_MESSAGE: - GPR_ASSERT (calld->slices.length <= GPR_UINT32_MAX); - grpc_sopb_add_begin_message (&new_send_ops, (gpr_uint32) calld->slices.length, sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS); - break; - case GRPC_OP_SLICE: - /* Once we reach the slices section of the original buffer, simply add - * all the new (compressed) slices. We obviously want to do this only - * once, hence the "new_slices_added" guard. */ - if (!new_slices_added) - { - size_t j; - for (j = 0; j < calld->slices.count; ++j) - { - grpc_sopb_add_slice (&new_send_ops, gpr_slice_ref (calld->slices.slices[j])); - } - new_slices_added = 1; /* GPR_TRUE */ - } - break; - case GRPC_OP_METADATA: - /* move the metadata to the new buffer. */ - grpc_metadata_batch_move (&metadata, &sop->data.metadata); - grpc_sopb_add_metadata (&new_send_ops, metadata); - break; - case GRPC_NO_OP: - break; - } + grpc_sopb_init(&new_send_ops); + + for (i = 0; i < send_ops->nops; i++) { + grpc_stream_op *sop = &send_ops->ops[i]; + switch (sop->type) { + case GRPC_OP_BEGIN_MESSAGE: + GPR_ASSERT(calld->slices.length <= GPR_UINT32_MAX); + grpc_sopb_add_begin_message( + &new_send_ops, (gpr_uint32)calld->slices.length, + sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS); + break; + case GRPC_OP_SLICE: + /* Once we reach the slices section of the original buffer, simply add + * all the new (compressed) slices. We obviously want to do this only + * once, hence the "new_slices_added" guard. */ + if (!new_slices_added) { + size_t j; + for (j = 0; j < calld->slices.count; ++j) { + grpc_sopb_add_slice(&new_send_ops, + gpr_slice_ref(calld->slices.slices[j])); + } + new_slices_added = 1; /* GPR_TRUE */ + } + break; + case GRPC_OP_METADATA: + /* move the metadata to the new buffer. */ + grpc_metadata_batch_move(&metadata, &sop->data.metadata); + grpc_sopb_add_metadata(&new_send_ops, metadata); + break; + case GRPC_NO_OP: + break; } - grpc_sopb_swap (send_ops, &new_send_ops); - grpc_sopb_destroy (&new_send_ops); + } + grpc_sopb_swap(send_ops, &new_send_ops); + grpc_sopb_destroy(&new_send_ops); } /** Filter's "main" function, called for any incoming grpc_transport_stream_op * instance that holds a non-zero number of send operations, accesible to this * function in \a send_ops. */ -static void -process_send_ops (grpc_call_element * elem, grpc_stream_op_buffer * send_ops) -{ +static void process_send_ops(grpc_call_element *elem, + grpc_stream_op_buffer *send_ops) { call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; size_t i; int did_compress = 0; /* In streaming calls, we need to reset the previously accumulated slices */ - gpr_slice_buffer_reset_and_unref (&calld->slices); - for (i = 0; i < send_ops->nops; ++i) - { - grpc_stream_op *sop = &send_ops->ops[i]; - switch (sop->type) - { - case GRPC_OP_BEGIN_MESSAGE: - /* buffer up slices until we've processed all the expected ones (as - * given by GRPC_OP_BEGIN_MESSAGE) */ - calld->remaining_slice_bytes = sop->data.begin_message.length; - if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) - { - calld->has_compression_algorithm = 1; /* GPR_TRUE */ - calld->compression_algorithm = GRPC_COMPRESS_NONE; - } - break; - case GRPC_OP_METADATA: - if (!calld->written_initial_metadata) - { - /* Parse incoming request for compression. If any, it'll be available - * at calld->compression_algorithm */ - grpc_metadata_batch_filter (&(sop->data.metadata), compression_md_filter, elem); - if (!calld->has_compression_algorithm) - { - /* If no algorithm was found in the metadata and we aren't - * exceptionally skipping compression, fall back to the channel - * default */ - calld->compression_algorithm = channeld->default_compression_algorithm; - calld->has_compression_algorithm = 1; /* GPR_TRUE */ - } - /* hint compression algorithm */ - grpc_metadata_batch_add_tail (&(sop->data.metadata), &calld->compression_algorithm_storage, GRPC_MDELEM_REF (channeld->mdelem_compression_algorithms[calld->compression_algorithm])); - - /* convey supported compression algorithms */ - grpc_metadata_batch_add_tail (&(sop->data.metadata), &calld->accept_encoding_storage, GRPC_MDELEM_REF (channeld->mdelem_accept_encoding)); - - calld->written_initial_metadata = 1; /* GPR_TRUE */ - } - break; - case GRPC_OP_SLICE: - if (skip_compression (channeld, calld)) - continue; - GPR_ASSERT (calld->remaining_slice_bytes > 0); - /* Increase input ref count, gpr_slice_buffer_add takes ownership. */ - gpr_slice_buffer_add (&calld->slices, gpr_slice_ref (sop->data.slice)); - GPR_ASSERT (GPR_SLICE_LENGTH (sop->data.slice) >= calld->remaining_slice_bytes); - calld->remaining_slice_bytes -= (gpr_uint32) GPR_SLICE_LENGTH (sop->data.slice); - if (calld->remaining_slice_bytes == 0) - { - did_compress = compress_send_sb (calld->compression_algorithm, &calld->slices); - } - break; - case GRPC_NO_OP: - break; - } + gpr_slice_buffer_reset_and_unref(&calld->slices); + for (i = 0; i < send_ops->nops; ++i) { + grpc_stream_op *sop = &send_ops->ops[i]; + switch (sop->type) { + case GRPC_OP_BEGIN_MESSAGE: + /* buffer up slices until we've processed all the expected ones (as + * given by GRPC_OP_BEGIN_MESSAGE) */ + calld->remaining_slice_bytes = sop->data.begin_message.length; + if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) { + calld->has_compression_algorithm = 1; /* GPR_TRUE */ + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } + break; + case GRPC_OP_METADATA: + if (!calld->written_initial_metadata) { + /* Parse incoming request for compression. If any, it'll be available + * at calld->compression_algorithm */ + grpc_metadata_batch_filter(&(sop->data.metadata), + compression_md_filter, elem); + if (!calld->has_compression_algorithm) { + /* If no algorithm was found in the metadata and we aren't + * exceptionally skipping compression, fall back to the channel + * default */ + calld->compression_algorithm = + channeld->default_compression_algorithm; + calld->has_compression_algorithm = 1; /* GPR_TRUE */ + } + /* hint compression algorithm */ + grpc_metadata_batch_add_tail( + &(sop->data.metadata), &calld->compression_algorithm_storage, + GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms + [calld->compression_algorithm])); + + /* convey supported compression algorithms */ + grpc_metadata_batch_add_tail( + &(sop->data.metadata), &calld->accept_encoding_storage, + GRPC_MDELEM_REF(channeld->mdelem_accept_encoding)); + + calld->written_initial_metadata = 1; /* GPR_TRUE */ + } + break; + case GRPC_OP_SLICE: + if (skip_compression(channeld, calld)) continue; + GPR_ASSERT(calld->remaining_slice_bytes > 0); + /* Increase input ref count, gpr_slice_buffer_add takes ownership. */ + gpr_slice_buffer_add(&calld->slices, gpr_slice_ref(sop->data.slice)); + GPR_ASSERT(GPR_SLICE_LENGTH(sop->data.slice) >= + calld->remaining_slice_bytes); + calld->remaining_slice_bytes -= + (gpr_uint32)GPR_SLICE_LENGTH(sop->data.slice); + if (calld->remaining_slice_bytes == 0) { + did_compress = + compress_send_sb(calld->compression_algorithm, &calld->slices); + } + break; + case GRPC_NO_OP: + break; } + } /* Modify the send_ops stream_op_buffer depending on whether compression was * carried out */ - if (did_compress) - { - finish_compressed_sopb (send_ops, elem); - } + if (did_compress) { + finish_compressed_sopb(send_ops, elem); + } } /* Called either: @@ -274,52 +268,49 @@ process_send_ops (grpc_call_element * elem, grpc_stream_op_buffer * send_ops) - a network event (or similar) from below, to receive something op contains type and call direction information, in addition to the data that is being sent or received. */ -static void -compress_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) -{ - if (op->send_ops && op->send_ops->nops > 0) - { - process_send_ops (elem, op->send_ops); - } +static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + if (op->send_ops && op->send_ops->nops > 0) { + process_send_ops(elem, op->send_ops); + } /* pass control down the stack */ - grpc_call_next_op (exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, op); } /* Constructor for call_data */ -static void -init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op) -{ +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const void *server_transport_data, + grpc_transport_stream_op *initial_op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ - gpr_slice_buffer_init (&calld->slices); + gpr_slice_buffer_init(&calld->slices); calld->has_compression_algorithm = 0; - calld->written_initial_metadata = 0; /* GPR_FALSE */ - - if (initial_op) - { - if (initial_op->send_ops && initial_op->send_ops->nops > 0) - { - process_send_ops (elem, initial_op->send_ops); - } + calld->written_initial_metadata = 0; /* GPR_FALSE */ + + if (initial_op) { + if (initial_op->send_ops && initial_op->send_ops->nops > 0) { + process_send_ops(elem, initial_op->send_ops); } + } } /* Destructor for call_data */ -static void -destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) -{ +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; - gpr_slice_buffer_destroy (&calld->slices); + gpr_slice_buffer_destroy(&calld->slices); } /* Constructor for channel_data */ -static void -init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last) -{ +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_channel *master, + const grpc_channel_args *args, grpc_mdctx *mdctx, + int is_first, int is_last) { channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1]; @@ -327,72 +318,82 @@ init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_c char *accept_encoding_str; size_t accept_encoding_str_len; - grpc_compression_options_init (&channeld->compression_options); - channeld->compression_options.enabled_algorithms_bitset = (gpr_uint32) grpc_channel_args_compression_algorithm_get_states (args); + grpc_compression_options_init(&channeld->compression_options); + channeld->compression_options.enabled_algorithms_bitset = + (gpr_uint32)grpc_channel_args_compression_algorithm_get_states(args); - channeld->default_compression_algorithm = grpc_channel_args_get_compression_algorithm (args); + channeld->default_compression_algorithm = + grpc_channel_args_get_compression_algorithm(args); /* Make sure the default isn't disabled. */ - GPR_ASSERT (grpc_compression_options_is_algorithm_enabled (&channeld->compression_options, channeld->default_compression_algorithm)); - channeld->compression_options.default_compression_algorithm = channeld->default_compression_algorithm; - - channeld->mdstr_request_compression_algorithm_key = grpc_mdstr_from_string (mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0); - - channeld->mdstr_outgoing_compression_algorithm_key = grpc_mdstr_from_string (mdctx, "grpc-encoding", 0); - - channeld->mdstr_compression_capabilities_key = grpc_mdstr_from_string (mdctx, "grpc-accept-encoding", 0); - - for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) - { - char *algorithm_name; - /* skip disabled algorithms */ - if (grpc_compression_options_is_algorithm_enabled (&channeld->compression_options, algo_idx) == 0) - { - continue; - } - GPR_ASSERT (grpc_compression_algorithm_name (algo_idx, &algorithm_name) != 0); - channeld->mdelem_compression_algorithms[algo_idx] = grpc_mdelem_from_metadata_strings (mdctx, GRPC_MDSTR_REF (channeld->mdstr_outgoing_compression_algorithm_key), grpc_mdstr_from_string (mdctx, algorithm_name, 0)); - if (algo_idx > 0) - { - supported_algorithms_names[supported_algorithms_idx++] = algorithm_name; - } + GPR_ASSERT(grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, channeld->default_compression_algorithm)); + channeld->compression_options.default_compression_algorithm = + channeld->default_compression_algorithm; + + channeld->mdstr_request_compression_algorithm_key = + grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0); + + channeld->mdstr_outgoing_compression_algorithm_key = + grpc_mdstr_from_string(mdctx, "grpc-encoding", 0); + + channeld->mdstr_compression_capabilities_key = + grpc_mdstr_from_string(mdctx, "grpc-accept-encoding", 0); + + for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { + char *algorithm_name; + /* skip disabled algorithms */ + if (grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, algo_idx) == 0) { + continue; + } + GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0); + channeld->mdelem_compression_algorithms[algo_idx] = + grpc_mdelem_from_metadata_strings( + mdctx, + GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), + grpc_mdstr_from_string(mdctx, algorithm_name, 0)); + if (algo_idx > 0) { + supported_algorithms_names[supported_algorithms_idx++] = algorithm_name; } + } /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated * arrays, as to avoid the heap allocs */ - accept_encoding_str = gpr_strjoin_sep (supported_algorithms_names, supported_algorithms_idx, ",", &accept_encoding_str_len); + accept_encoding_str = + gpr_strjoin_sep(supported_algorithms_names, supported_algorithms_idx, ",", + &accept_encoding_str_len); - channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings (mdctx, GRPC_MDSTR_REF (channeld->mdstr_compression_capabilities_key), grpc_mdstr_from_string (mdctx, accept_encoding_str, 0)); - gpr_free (accept_encoding_str); + channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings( + mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), + grpc_mdstr_from_string(mdctx, accept_encoding_str, 0)); + gpr_free(accept_encoding_str); - GPR_ASSERT (!is_last); + GPR_ASSERT(!is_last); } /* Destructor for channel data */ -static void -destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem) -{ +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; - GRPC_MDSTR_UNREF (channeld->mdstr_request_compression_algorithm_key); - GRPC_MDSTR_UNREF (channeld->mdstr_outgoing_compression_algorithm_key); - GRPC_MDSTR_UNREF (channeld->mdstr_compression_capabilities_key); - for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) - { - GRPC_MDELEM_UNREF (channeld->mdelem_compression_algorithms[algo_idx]); - } - GRPC_MDELEM_UNREF (channeld->mdelem_accept_encoding); + GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key); + GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key); + GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key); + for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { + GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]); + } + GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding); } const grpc_channel_filter grpc_compress_filter = { - compress_start_transport_stream_op, - grpc_channel_next_op, - sizeof (call_data), - init_call_elem, - destroy_call_elem, - sizeof (channel_data), - init_channel_elem, - destroy_channel_elem, - grpc_call_next_get_peer, - "compress" -}; + compress_start_transport_stream_op, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + "compress"}; diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index bdb75474ff..ea701bc284 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -46,15 +46,11 @@ #define MAX_BUFFER_LENGTH 8192 -typedef struct connected_channel_channel_data -{ +typedef struct connected_channel_channel_data { grpc_transport *transport; } channel_data; -typedef struct connected_channel_call_data -{ - void *unused; -} call_data; +typedef struct connected_channel_call_data { void *unused; } call_data; /* We perform a small hack to locate transport data alongside the connected channel data in call allocations, to allow everything to be pulled in minimal @@ -65,95 +61,95 @@ typedef struct connected_channel_call_data /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ -static void -con_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) -{ +static void con_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); - GRPC_CALL_LOG_OP (GPR_INFO, elem, op); + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_transport_perform_stream_op (exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld), op); + grpc_transport_perform_stream_op(exec_ctx, chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); } -static void -con_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op) -{ +static void con_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { channel_data *chand = elem->channel_data; - grpc_transport_perform_op (exec_ctx, chand->transport, op); + grpc_transport_perform_op(exec_ctx, chand->transport, op); } /* Constructor for call_data */ -static void -init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op) -{ +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const void *server_transport_data, + grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; int r; - GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); - r = grpc_transport_init_stream (exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld), server_transport_data, initial_op); - GPR_ASSERT (r == 0); + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + r = grpc_transport_init_stream(exec_ctx, chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + server_transport_data, initial_op); + GPR_ASSERT(r == 0); } /* Destructor for call_data */ -static void -destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) -{ +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); - grpc_transport_destroy_stream (exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld)); + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + grpc_transport_destroy_stream(exec_ctx, chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld)); } /* Constructor for channel_data */ -static void -init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last) -{ - channel_data *cd = (channel_data *) elem->channel_data; - GPR_ASSERT (is_last); - GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_channel *master, + const grpc_channel_args *args, grpc_mdctx *mdctx, + int is_first, int is_last) { + channel_data *cd = (channel_data *)elem->channel_data; + GPR_ASSERT(is_last); + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); cd->transport = NULL; } /* Destructor for channel_data */ -static void -destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem) -{ - channel_data *cd = (channel_data *) elem->channel_data; - GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); - grpc_transport_destroy (exec_ctx, cd->transport); +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { + channel_data *cd = (channel_data *)elem->channel_data; + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + grpc_transport_destroy(exec_ctx, cd->transport); } -static char * -con_get_peer (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) -{ +static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { channel_data *chand = elem->channel_data; - return grpc_transport_get_peer (exec_ctx, chand->transport); + return grpc_transport_get_peer(exec_ctx, chand->transport); } const grpc_channel_filter grpc_connected_channel_filter = { - con_start_transport_stream_op, - con_start_transport_op, - sizeof (call_data), - init_call_elem, - destroy_call_elem, - sizeof (channel_data), - init_channel_elem, - destroy_channel_elem, - con_get_peer, - "connected", + con_start_transport_stream_op, + con_start_transport_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + con_get_peer, + "connected", }; -void -grpc_connected_channel_bind_transport (grpc_channel_stack * channel_stack, grpc_transport * transport) -{ +void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack, + grpc_transport *transport) { /* Assumes that the connected channel filter is always the last filter in a channel stack */ - grpc_channel_element *elem = grpc_channel_stack_last_element (channel_stack); - channel_data *cd = (channel_data *) elem->channel_data; - GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); - GPR_ASSERT (cd->transport == NULL); + grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); + channel_data *cd = (channel_data *)elem->channel_data; + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + GPR_ASSERT(cd->transport == NULL); cd->transport = transport; /* HACK(ctiller): increase call stack size for the channel to make space @@ -162,5 +158,5 @@ grpc_connected_channel_bind_transport (grpc_channel_stack * channel_stack, grpc_ This is only "safe" because call stacks place no additional data after the last call element, and the last call element MUST be the connected channel. */ - channel_stack->call_stack_size += grpc_transport_stream_size (transport); + channel_stack->call_stack_size += grpc_transport_stream_size(transport); } diff --git a/src/core/channel/connected_channel.h b/src/core/channel/connected_channel.h index 67bcd6c911..eac6eb7ebe 100644 --- a/src/core/channel/connected_channel.h +++ b/src/core/channel/connected_channel.h @@ -43,6 +43,7 @@ extern const grpc_channel_filter grpc_connected_channel_filter; /* Post construction fixup: set the transport in the connected channel. Must be called before any call stack using this filter is used. */ -void grpc_connected_channel_bind_transport (grpc_channel_stack * channel_stack, grpc_transport * transport); +void grpc_connected_channel_bind_transport(grpc_channel_stack* channel_stack, + grpc_transport* transport); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */ diff --git a/src/core/channel/context.h b/src/core/channel/context.h index 076dc7189b..ac5796b9ef 100644 --- a/src/core/channel/context.h +++ b/src/core/channel/context.h @@ -35,17 +35,15 @@ #define GRPC_INTERNAL_CORE_CHANNEL_CONTEXT_H /* Call object context pointers */ -typedef enum -{ +typedef enum { GRPC_CONTEXT_SECURITY = 0, GRPC_CONTEXT_TRACING, GRPC_CONTEXT_COUNT } grpc_context_index; -typedef struct -{ +typedef struct { void *value; - void (*destroy) (void *); + void (*destroy)(void *); } grpc_call_context_element; #endif /* GRPC_INTERNAL_CORE_CHANNEL_CONTEXT_H */ diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 3421bdb7b1..721ed4da39 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -37,8 +37,7 @@ #include <grpc/support/string_util.h> #include "src/core/support/string.h" -typedef struct call_data -{ +typedef struct call_data { grpc_linked_mdelem method; grpc_linked_mdelem scheme; grpc_linked_mdelem authority; @@ -58,8 +57,7 @@ typedef struct call_data grpc_closure hc_on_recv; } call_data; -typedef struct channel_data -{ +typedef struct channel_data { grpc_mdelem *te_trailers; grpc_mdelem *method; grpc_mdelem *scheme; @@ -69,255 +67,222 @@ typedef struct channel_data grpc_mdelem *user_agent; } channel_data; -typedef struct -{ +typedef struct { grpc_call_element *elem; grpc_exec_ctx *exec_ctx; } client_recv_filter_args; -static grpc_mdelem * -client_recv_filter (void *user_data, grpc_mdelem * md) -{ +static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { client_recv_filter_args *a = user_data; grpc_call_element *elem = a->elem; channel_data *channeld = elem->channel_data; - if (md == channeld->status) - { - return NULL; - } - else if (md->key == channeld->status->key) - { - grpc_call_element_send_cancel (a->exec_ctx, elem); - return NULL; - } - else if (md->key == channeld->content_type->key) - { - return NULL; - } + if (md == channeld->status) { + return NULL; + } else if (md->key == channeld->status->key) { + grpc_call_element_send_cancel(a->exec_ctx, elem); + return NULL; + } else if (md->key == channeld->content_type->key) { + return NULL; + } return md; } -static void -hc_on_recv (grpc_exec_ctx * exec_ctx, void *user_data, int success) -{ +static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; size_t i; size_t nops = calld->recv_ops->nops; grpc_stream_op *ops = calld->recv_ops->ops; - for (i = 0; i < nops; i++) - { - grpc_stream_op *op = &ops[i]; - client_recv_filter_args a; - if (op->type != GRPC_OP_METADATA) - continue; - calld->got_initial_metadata = 1; - a.elem = elem; - a.exec_ctx = exec_ctx; - grpc_metadata_batch_filter (&op->data.metadata, client_recv_filter, &a); - } - calld->on_done_recv->cb (exec_ctx, calld->on_done_recv->cb_arg, success); + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + client_recv_filter_args a; + if (op->type != GRPC_OP_METADATA) continue; + calld->got_initial_metadata = 1; + a.elem = elem; + a.exec_ctx = exec_ctx; + grpc_metadata_batch_filter(&op->data.metadata, client_recv_filter, &a); + } + calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); } -static grpc_mdelem * -client_strip_filter (void *user_data, grpc_mdelem * md) -{ +static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; channel_data *channeld = elem->channel_data; /* eat the things we'd like to set ourselves */ - if (md->key == channeld->method->key) - return NULL; - if (md->key == channeld->scheme->key) - return NULL; - if (md->key == channeld->te_trailers->key) - return NULL; - if (md->key == channeld->content_type->key) - return NULL; - if (md->key == channeld->user_agent->key) - return NULL; + if (md->key == channeld->method->key) return NULL; + if (md->key == channeld->scheme->key) return NULL; + if (md->key == channeld->te_trailers->key) return NULL; + if (md->key == channeld->content_type->key) return NULL; + if (md->key == channeld->user_agent->key) return NULL; return md; } -static void -hc_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op) -{ +static void hc_mutate_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; size_t i; - if (op->send_ops && !calld->sent_initial_metadata) - { - size_t nops = op->send_ops->nops; - grpc_stream_op *ops = op->send_ops->ops; - for (i = 0; i < nops; i++) - { - grpc_stream_op *op = &ops[i]; - if (op->type != GRPC_OP_METADATA) - continue; - calld->sent_initial_metadata = 1; - grpc_metadata_batch_filter (&op->data.metadata, client_strip_filter, elem); - /* Send : prefixed headers, which have to be before any application - layer headers. */ - grpc_metadata_batch_add_head (&op->data.metadata, &calld->method, GRPC_MDELEM_REF (channeld->method)); - grpc_metadata_batch_add_head (&op->data.metadata, &calld->scheme, GRPC_MDELEM_REF (channeld->scheme)); - grpc_metadata_batch_add_tail (&op->data.metadata, &calld->te_trailers, GRPC_MDELEM_REF (channeld->te_trailers)); - grpc_metadata_batch_add_tail (&op->data.metadata, &calld->content_type, GRPC_MDELEM_REF (channeld->content_type)); - grpc_metadata_batch_add_tail (&op->data.metadata, &calld->user_agent, GRPC_MDELEM_REF (channeld->user_agent)); - break; - } + if (op->send_ops && !calld->sent_initial_metadata) { + size_t nops = op->send_ops->nops; + grpc_stream_op *ops = op->send_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + calld->sent_initial_metadata = 1; + grpc_metadata_batch_filter(&op->data.metadata, client_strip_filter, elem); + /* Send : prefixed headers, which have to be before any application + layer headers. */ + grpc_metadata_batch_add_head(&op->data.metadata, &calld->method, + GRPC_MDELEM_REF(channeld->method)); + grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme, + GRPC_MDELEM_REF(channeld->scheme)); + grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers, + GRPC_MDELEM_REF(channeld->te_trailers)); + grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, + GRPC_MDELEM_REF(channeld->content_type)); + grpc_metadata_batch_add_tail(&op->data.metadata, &calld->user_agent, + GRPC_MDELEM_REF(channeld->user_agent)); + break; } + } - if (op->recv_ops && !calld->got_initial_metadata) - { - /* substitute our callback for the higher callback */ - calld->recv_ops = op->recv_ops; - calld->on_done_recv = op->on_done_recv; - op->on_done_recv = &calld->hc_on_recv; - } + if (op->recv_ops && !calld->got_initial_metadata) { + /* substitute our callback for the higher callback */ + calld->recv_ops = op->recv_ops; + calld->on_done_recv = op->on_done_recv; + op->on_done_recv = &calld->hc_on_recv; + } } -static void -hc_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) -{ - GRPC_CALL_LOG_OP (GPR_INFO, elem, op); - hc_mutate_op (elem, op); - grpc_call_next_op (exec_ctx, elem, op); +static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + hc_mutate_op(elem, op); + grpc_call_next_op(exec_ctx, elem, op); } /* Constructor for call_data */ -static void -init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op) -{ +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const void *server_transport_data, + grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; calld->sent_initial_metadata = 0; calld->got_initial_metadata = 0; calld->on_done_recv = NULL; - grpc_closure_init (&calld->hc_on_recv, hc_on_recv, elem); - if (initial_op) - hc_mutate_op (elem, initial_op); + grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem); + if (initial_op) hc_mutate_op(elem, initial_op); } /* Destructor for call_data */ -static void -destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) -{ -} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) {} -static const char * -scheme_from_args (const grpc_channel_args * args) -{ +static const char *scheme_from_args(const grpc_channel_args *args) { unsigned i; - if (args != NULL) - { - for (i = 0; i < args->num_args; ++i) - { - if (args->args[i].type == GRPC_ARG_STRING && strcmp (args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) - { - return args->args[i].value.string; - } - } + if (args != NULL) { + for (i = 0; i < args->num_args; ++i) { + if (args->args[i].type == GRPC_ARG_STRING && + strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) { + return args->args[i].value.string; + } } + } return "http"; } -static grpc_mdstr * -user_agent_from_args (grpc_mdctx * mdctx, const grpc_channel_args * args) -{ +static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx, + const grpc_channel_args *args) { gpr_strvec v; size_t i; int is_first = 1; char *tmp; grpc_mdstr *result; - gpr_strvec_init (&v); + gpr_strvec_init(&v); - for (i = 0; args && i < args->num_args; i++) - { - if (0 == strcmp (args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) - { - if (args->args[i].type != GRPC_ARG_STRING) - { - gpr_log (GPR_ERROR, "Channel argument '%s' should be a string", GRPC_ARG_PRIMARY_USER_AGENT_STRING); - } - else - { - if (!is_first) - gpr_strvec_add (&v, gpr_strdup (" ")); - is_first = 0; - gpr_strvec_add (&v, gpr_strdup (args->args[i].value.string)); - } - } + for (i = 0; args && i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) { + if (args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", + GRPC_ARG_PRIMARY_USER_AGENT_STRING); + } else { + if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); + is_first = 0; + gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); + } } + } - gpr_asprintf (&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ", grpc_version_string (), GPR_PLATFORM_STRING); + gpr_asprintf(&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ", + grpc_version_string(), GPR_PLATFORM_STRING); is_first = 0; - gpr_strvec_add (&v, tmp); + gpr_strvec_add(&v, tmp); - for (i = 0; args && i < args->num_args; i++) - { - if (0 == strcmp (args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) - { - if (args->args[i].type != GRPC_ARG_STRING) - { - gpr_log (GPR_ERROR, "Channel argument '%s' should be a string", GRPC_ARG_SECONDARY_USER_AGENT_STRING); - } - else - { - if (!is_first) - gpr_strvec_add (&v, gpr_strdup (" ")); - is_first = 0; - gpr_strvec_add (&v, gpr_strdup (args->args[i].value.string)); - } - } + for (i = 0; args && i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) { + if (args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", + GRPC_ARG_SECONDARY_USER_AGENT_STRING); + } else { + if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); + is_first = 0; + gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); + } } + } - tmp = gpr_strvec_flatten (&v, NULL); - gpr_strvec_destroy (&v); - result = grpc_mdstr_from_string (mdctx, tmp, 0); - gpr_free (tmp); + tmp = gpr_strvec_flatten(&v, NULL); + gpr_strvec_destroy(&v); + result = grpc_mdstr_from_string(mdctx, tmp, 0); + gpr_free(tmp); return result; } /* Constructor for channel_data */ -static void -init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * channel_args, grpc_mdctx * mdctx, int is_first, int is_last) -{ +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_channel *master, + const grpc_channel_args *channel_args, + grpc_mdctx *mdctx, int is_first, int is_last) { /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; /* The first and the last filters tend to be implemented differently to handle the case that there's no 'next' filter to call on the up or down path */ - GPR_ASSERT (!is_last); + GPR_ASSERT(!is_last); /* initialize members */ - channeld->te_trailers = grpc_mdelem_from_strings (mdctx, "te", "trailers"); - channeld->method = grpc_mdelem_from_strings (mdctx, ":method", "POST"); - channeld->scheme = grpc_mdelem_from_strings (mdctx, ":scheme", scheme_from_args (channel_args)); - channeld->content_type = grpc_mdelem_from_strings (mdctx, "content-type", "application/grpc"); - channeld->status = grpc_mdelem_from_strings (mdctx, ":status", "200"); - channeld->user_agent = grpc_mdelem_from_metadata_strings (mdctx, grpc_mdstr_from_string (mdctx, "user-agent", 0), user_agent_from_args (mdctx, channel_args)); + channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers"); + channeld->method = grpc_mdelem_from_strings(mdctx, ":method", "POST"); + channeld->scheme = grpc_mdelem_from_strings(mdctx, ":scheme", + scheme_from_args(channel_args)); + channeld->content_type = + grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); + channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200"); + channeld->user_agent = grpc_mdelem_from_metadata_strings( + mdctx, grpc_mdstr_from_string(mdctx, "user-agent", 0), + user_agent_from_args(mdctx, channel_args)); } /* Destructor for channel data */ -static void -destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem) -{ +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; - GRPC_MDELEM_UNREF (channeld->te_trailers); - GRPC_MDELEM_UNREF (channeld->method); - GRPC_MDELEM_UNREF (channeld->scheme); - GRPC_MDELEM_UNREF (channeld->content_type); - GRPC_MDELEM_UNREF (channeld->status); - GRPC_MDELEM_UNREF (channeld->user_agent); + GRPC_MDELEM_UNREF(channeld->te_trailers); + GRPC_MDELEM_UNREF(channeld->method); + GRPC_MDELEM_UNREF(channeld->scheme); + GRPC_MDELEM_UNREF(channeld->content_type); + GRPC_MDELEM_UNREF(channeld->status); + GRPC_MDELEM_UNREF(channeld->user_agent); } const grpc_channel_filter grpc_http_client_filter = { - hc_start_transport_op, grpc_channel_next_op, sizeof (call_data), - init_call_elem, destroy_call_elem, sizeof (channel_data), - init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, - "http-client" -}; + hc_start_transport_op, grpc_channel_next_op, sizeof(call_data), + init_call_elem, destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + "http-client"}; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 6f776b8431..02a351d7b1 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -37,8 +37,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -typedef struct call_data -{ +typedef struct call_data { gpr_uint8 got_initial_metadata; gpr_uint8 seen_path; gpr_uint8 seen_post; @@ -58,8 +57,7 @@ typedef struct call_data grpc_closure hs_on_recv; } call_data; -typedef struct channel_data -{ +typedef struct channel_data { grpc_mdelem *te_trailers; grpc_mdelem *method_post; grpc_mdelem *http_scheme; @@ -76,268 +74,234 @@ typedef struct channel_data grpc_mdctx *mdctx; } channel_data; -typedef struct -{ +typedef struct { grpc_call_element *elem; grpc_exec_ctx *exec_ctx; } server_filter_args; -static grpc_mdelem * -server_filter (void *user_data, grpc_mdelem * md) -{ +static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { server_filter_args *a = user_data; grpc_call_element *elem = a->elem; channel_data *channeld = elem->channel_data; call_data *calld = elem->call_data; /* Check if it is one of the headers we care about. */ - if (md == channeld->te_trailers || md == channeld->method_post || md == channeld->http_scheme || md == channeld->https_scheme || md == channeld->grpc_scheme || md == channeld->content_type) - { - /* swallow it */ - if (md == channeld->method_post) - { - calld->seen_post = 1; - } - else if (md->key == channeld->http_scheme->key) - { - calld->seen_scheme = 1; - } - else if (md == channeld->te_trailers) - { - calld->seen_te_trailers = 1; - } - /* TODO(klempner): Track that we've seen all the headers we should - require */ - return NULL; + if (md == channeld->te_trailers || md == channeld->method_post || + md == channeld->http_scheme || md == channeld->https_scheme || + md == channeld->grpc_scheme || md == channeld->content_type) { + /* swallow it */ + if (md == channeld->method_post) { + calld->seen_post = 1; + } else if (md->key == channeld->http_scheme->key) { + calld->seen_scheme = 1; + } else if (md == channeld->te_trailers) { + calld->seen_te_trailers = 1; } - else if (md->key == channeld->content_type->key) - { - if (strncmp (grpc_mdstr_as_c_string (md->value), "application/grpc+", 17) == 0) - { - /* Although the C implementation doesn't (currently) generate them, - any custom +-suffix is explicitly valid. */ - /* TODO(klempner): We should consider preallocating common values such - as +proto or +json, or at least stashing them if we see them. */ - /* TODO(klempner): Should we be surfacing this to application code? */ - } - else - { - /* TODO(klempner): We're currently allowing this, but we shouldn't - see it without a proxy so log for now. */ - gpr_log (GPR_INFO, "Unexpected content-type %s", channeld->content_type->key); - } - return NULL; + /* TODO(klempner): Track that we've seen all the headers we should + require */ + return NULL; + } else if (md->key == channeld->content_type->key) { + if (strncmp(grpc_mdstr_as_c_string(md->value), "application/grpc+", 17) == + 0) { + /* Although the C implementation doesn't (currently) generate them, + any custom +-suffix is explicitly valid. */ + /* TODO(klempner): We should consider preallocating common values such + as +proto or +json, or at least stashing them if we see them. */ + /* TODO(klempner): Should we be surfacing this to application code? */ + } else { + /* TODO(klempner): We're currently allowing this, but we shouldn't + see it without a proxy so log for now. */ + gpr_log(GPR_INFO, "Unexpected content-type %s", + channeld->content_type->key); } - else if (md->key == channeld->te_trailers->key || md->key == channeld->method_post->key || md->key == channeld->http_scheme->key) - { - gpr_log (GPR_ERROR, "Invalid %s: header: '%s'", grpc_mdstr_as_c_string (md->key), grpc_mdstr_as_c_string (md->value)); - /* swallow it and error everything out. */ - /* TODO(klempner): We ought to generate more descriptive error messages - on the wire here. */ - grpc_call_element_send_cancel (a->exec_ctx, elem); + return NULL; + } else if (md->key == channeld->te_trailers->key || + md->key == channeld->method_post->key || + md->key == channeld->http_scheme->key) { + gpr_log(GPR_ERROR, "Invalid %s: header: '%s'", + grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)); + /* swallow it and error everything out. */ + /* TODO(klempner): We ought to generate more descriptive error messages + on the wire here. */ + grpc_call_element_send_cancel(a->exec_ctx, elem); + return NULL; + } else if (md->key == channeld->path_key) { + if (calld->seen_path) { + gpr_log(GPR_ERROR, "Received :path twice"); return NULL; } - else if (md->key == channeld->path_key) - { - if (calld->seen_path) - { - gpr_log (GPR_ERROR, "Received :path twice"); - return NULL; - } - calld->seen_path = 1; - return md; - } - else if (md->key == channeld->authority_key) - { - calld->seen_authority = 1; - return md; - } - else if (md->key == channeld->host_key) - { - /* translate host to :authority since :authority may be - omitted */ - grpc_mdelem *authority = grpc_mdelem_from_metadata_strings (channeld->mdctx, GRPC_MDSTR_REF (channeld->authority_key), - GRPC_MDSTR_REF (md->value)); - GRPC_MDELEM_UNREF (md); - calld->seen_authority = 1; - return authority; - } - else - { - return md; - } + calld->seen_path = 1; + return md; + } else if (md->key == channeld->authority_key) { + calld->seen_authority = 1; + return md; + } else if (md->key == channeld->host_key) { + /* translate host to :authority since :authority may be + omitted */ + grpc_mdelem *authority = grpc_mdelem_from_metadata_strings( + channeld->mdctx, GRPC_MDSTR_REF(channeld->authority_key), + GRPC_MDSTR_REF(md->value)); + GRPC_MDELEM_UNREF(md); + calld->seen_authority = 1; + return authority; + } else { + return md; + } } -static void -hs_on_recv (grpc_exec_ctx * exec_ctx, void *user_data, int success) -{ +static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - if (success) - { - size_t i; - size_t nops = calld->recv_ops->nops; - grpc_stream_op *ops = calld->recv_ops->ops; - for (i = 0; i < nops; i++) - { - grpc_stream_op *op = &ops[i]; - server_filter_args a; - if (op->type != GRPC_OP_METADATA) - continue; - calld->got_initial_metadata = 1; - a.elem = elem; - a.exec_ctx = exec_ctx; - grpc_metadata_batch_filter (&op->data.metadata, server_filter, &a); - /* Have we seen the required http2 transport headers? - (:method, :scheme, content-type, with :path and :authority covered - at the channel level right now) */ - if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && calld->seen_path && calld->seen_authority) - { - /* do nothing */ - } - else - { - if (!calld->seen_path) - { - gpr_log (GPR_ERROR, "Missing :path header"); - } - if (!calld->seen_authority) - { - gpr_log (GPR_ERROR, "Missing :authority header"); - } - if (!calld->seen_post) - { - gpr_log (GPR_ERROR, "Missing :method header"); - } - if (!calld->seen_scheme) - { - gpr_log (GPR_ERROR, "Missing :scheme header"); - } - if (!calld->seen_te_trailers) - { - gpr_log (GPR_ERROR, "Missing te trailers header"); - } - /* Error this call out */ - success = 0; - grpc_call_element_send_cancel (exec_ctx, elem); - } - } + if (success) { + size_t i; + size_t nops = calld->recv_ops->nops; + grpc_stream_op *ops = calld->recv_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + server_filter_args a; + if (op->type != GRPC_OP_METADATA) continue; + calld->got_initial_metadata = 1; + a.elem = elem; + a.exec_ctx = exec_ctx; + grpc_metadata_batch_filter(&op->data.metadata, server_filter, &a); + /* Have we seen the required http2 transport headers? + (:method, :scheme, content-type, with :path and :authority covered + at the channel level right now) */ + if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && + calld->seen_path && calld->seen_authority) { + /* do nothing */ + } else { + if (!calld->seen_path) { + gpr_log(GPR_ERROR, "Missing :path header"); + } + if (!calld->seen_authority) { + gpr_log(GPR_ERROR, "Missing :authority header"); + } + if (!calld->seen_post) { + gpr_log(GPR_ERROR, "Missing :method header"); + } + if (!calld->seen_scheme) { + gpr_log(GPR_ERROR, "Missing :scheme header"); + } + if (!calld->seen_te_trailers) { + gpr_log(GPR_ERROR, "Missing te trailers header"); + } + /* Error this call out */ + success = 0; + grpc_call_element_send_cancel(exec_ctx, elem); + } } - calld->on_done_recv->cb (exec_ctx, calld->on_done_recv->cb_arg, success); + } + calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); } -static void -hs_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op) -{ +static void hs_mutate_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; size_t i; - if (op->send_ops && !calld->sent_status) - { - size_t nops = op->send_ops->nops; - grpc_stream_op *ops = op->send_ops->ops; - for (i = 0; i < nops; i++) - { - grpc_stream_op *op = &ops[i]; - if (op->type != GRPC_OP_METADATA) - continue; - calld->sent_status = 1; - grpc_metadata_batch_add_head (&op->data.metadata, &calld->status, GRPC_MDELEM_REF (channeld->status_ok)); - grpc_metadata_batch_add_tail (&op->data.metadata, &calld->content_type, GRPC_MDELEM_REF (channeld->content_type)); - break; - } + if (op->send_ops && !calld->sent_status) { + size_t nops = op->send_ops->nops; + grpc_stream_op *ops = op->send_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + calld->sent_status = 1; + grpc_metadata_batch_add_head(&op->data.metadata, &calld->status, + GRPC_MDELEM_REF(channeld->status_ok)); + grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, + GRPC_MDELEM_REF(channeld->content_type)); + break; } + } - if (op->recv_ops && !calld->got_initial_metadata) - { - /* substitute our callback for the higher callback */ - calld->recv_ops = op->recv_ops; - calld->on_done_recv = op->on_done_recv; - op->on_done_recv = &calld->hs_on_recv; - } + if (op->recv_ops && !calld->got_initial_metadata) { + /* substitute our callback for the higher callback */ + calld->recv_ops = op->recv_ops; + calld->on_done_recv = op->on_done_recv; + op->on_done_recv = &calld->hs_on_recv; + } } -static void -hs_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) -{ - GRPC_CALL_LOG_OP (GPR_INFO, elem, op); - hs_mutate_op (elem, op); - grpc_call_next_op (exec_ctx, elem, op); +static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + hs_mutate_op(elem, op); + grpc_call_next_op(exec_ctx, elem, op); } /* Constructor for call_data */ -static void -init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op) -{ +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const void *server_transport_data, + grpc_transport_stream_op *initial_op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ - memset (calld, 0, sizeof (*calld)); - grpc_closure_init (&calld->hs_on_recv, hs_on_recv, elem); - if (initial_op) - hs_mutate_op (elem, initial_op); + memset(calld, 0, sizeof(*calld)); + grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem); + if (initial_op) hs_mutate_op(elem, initial_op); } /* Destructor for call_data */ -static void -destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) -{ -} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) {} /* Constructor for channel_data */ -static void -init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last) -{ +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_channel *master, + const grpc_channel_args *args, grpc_mdctx *mdctx, + int is_first, int is_last) { /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; /* The first and the last filters tend to be implemented differently to handle the case that there's no 'next' filter to call on the up or down path */ - GPR_ASSERT (!is_first); - GPR_ASSERT (!is_last); + GPR_ASSERT(!is_first); + GPR_ASSERT(!is_last); /* initialize members */ - channeld->te_trailers = grpc_mdelem_from_strings (mdctx, "te", "trailers"); - channeld->status_ok = grpc_mdelem_from_strings (mdctx, ":status", "200"); - channeld->status_not_found = grpc_mdelem_from_strings (mdctx, ":status", "404"); - channeld->method_post = grpc_mdelem_from_strings (mdctx, ":method", "POST"); - channeld->http_scheme = grpc_mdelem_from_strings (mdctx, ":scheme", "http"); - channeld->https_scheme = grpc_mdelem_from_strings (mdctx, ":scheme", "https"); - channeld->grpc_scheme = grpc_mdelem_from_strings (mdctx, ":scheme", "grpc"); - channeld->path_key = grpc_mdstr_from_string (mdctx, ":path", 0); - channeld->authority_key = grpc_mdstr_from_string (mdctx, ":authority", 0); - channeld->host_key = grpc_mdstr_from_string (mdctx, "host", 0); - channeld->content_type = grpc_mdelem_from_strings (mdctx, "content-type", "application/grpc"); + channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers"); + channeld->status_ok = grpc_mdelem_from_strings(mdctx, ":status", "200"); + channeld->status_not_found = + grpc_mdelem_from_strings(mdctx, ":status", "404"); + channeld->method_post = grpc_mdelem_from_strings(mdctx, ":method", "POST"); + channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http"); + channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https"); + channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc"); + channeld->path_key = grpc_mdstr_from_string(mdctx, ":path", 0); + channeld->authority_key = grpc_mdstr_from_string(mdctx, ":authority", 0); + channeld->host_key = grpc_mdstr_from_string(mdctx, "host", 0); + channeld->content_type = + grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); channeld->mdctx = mdctx; } /* Destructor for channel data */ -static void -destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem) -{ +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; - GRPC_MDELEM_UNREF (channeld->te_trailers); - GRPC_MDELEM_UNREF (channeld->status_ok); - GRPC_MDELEM_UNREF (channeld->status_not_found); - GRPC_MDELEM_UNREF (channeld->method_post); - GRPC_MDELEM_UNREF (channeld->http_scheme); - GRPC_MDELEM_UNREF (channeld->https_scheme); - GRPC_MDELEM_UNREF (channeld->grpc_scheme); - GRPC_MDELEM_UNREF (channeld->content_type); - GRPC_MDSTR_UNREF (channeld->path_key); - GRPC_MDSTR_UNREF (channeld->authority_key); - GRPC_MDSTR_UNREF (channeld->host_key); + GRPC_MDELEM_UNREF(channeld->te_trailers); + GRPC_MDELEM_UNREF(channeld->status_ok); + GRPC_MDELEM_UNREF(channeld->status_not_found); + GRPC_MDELEM_UNREF(channeld->method_post); + GRPC_MDELEM_UNREF(channeld->http_scheme); + GRPC_MDELEM_UNREF(channeld->https_scheme); + GRPC_MDELEM_UNREF(channeld->grpc_scheme); + GRPC_MDELEM_UNREF(channeld->content_type); + GRPC_MDSTR_UNREF(channeld->path_key); + GRPC_MDSTR_UNREF(channeld->authority_key); + GRPC_MDSTR_UNREF(channeld->host_key); } const grpc_channel_filter grpc_http_server_filter = { - hs_start_transport_op, grpc_channel_next_op, sizeof (call_data), - init_call_elem, destroy_call_elem, sizeof (channel_data), - init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, - "http-server" -}; + hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), + init_call_elem, destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + "http-server"}; diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index 7e566642f9..91b30d61ca 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -34,31 +34,25 @@ #include "src/core/channel/noop_filter.h" #include <grpc/support/log.h> -typedef struct call_data -{ - int unused; /* C89 requires at least one struct element */ +typedef struct call_data { + int unused; /* C89 requires at least one struct element */ } call_data; -typedef struct channel_data -{ - int unused; /* C89 requires at least one struct element */ +typedef struct channel_data { + int unused; /* C89 requires at least one struct element */ } channel_data; /* used to silence 'variable not used' warnings */ -static void -ignore_unused (void *ignored) -{ -} +static void ignore_unused(void *ignored) {} -static void -noop_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op) -{ +static void noop_mutate_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; - ignore_unused (calld); - ignore_unused (channeld); + ignore_unused(calld); + ignore_unused(channeld); /* do nothing */ } @@ -68,19 +62,19 @@ noop_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op) - a network event (or similar) from below, to receive something op contains type and call direction information, in addition to the data that is being sent or received. */ -static void -noop_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op) -{ - noop_mutate_op (elem, op); +static void noop_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + noop_mutate_op(elem, op); /* pass control down the stack */ - grpc_call_next_op (exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, op); } /* Constructor for call_data */ -static void -init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op) -{ +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const void *server_transport_data, + grpc_transport_stream_op *initial_op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -88,51 +82,47 @@ init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void * /* initialize members */ calld->unused = channeld->unused; - if (initial_op) - noop_mutate_op (elem, initial_op); + if (initial_op) noop_mutate_op(elem, initial_op); } /* Destructor for call_data */ -static void -destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem) -{ -} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) {} /* Constructor for channel_data */ -static void -init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last) -{ +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_channel *master, + const grpc_channel_args *args, grpc_mdctx *mdctx, + int is_first, int is_last) { /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; /* The first and the last filters tend to be implemented differently to handle the case that there's no 'next' filter to call on the up or down path */ - GPR_ASSERT (!is_first); - GPR_ASSERT (!is_last); + GPR_ASSERT(!is_first); + GPR_ASSERT(!is_last); /* initialize members */ channeld->unused = 0; } /* Destructor for channel data */ -static void -destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem) -{ +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; - ignore_unused (channeld); + ignore_unused(channeld); } -const grpc_channel_filter grpc_no_op_filter = { noop_start_transport_stream_op, - grpc_channel_next_op, - sizeof (call_data), - init_call_elem, - destroy_call_elem, - sizeof (channel_data), - init_channel_elem, - destroy_channel_elem, - grpc_call_next_get_peer, - "no-op" -}; +const grpc_channel_filter grpc_no_op_filter = {noop_start_transport_stream_op, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + "no-op"}; |