diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-22 10:42:19 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-22 10:42:19 -0700 |
commit | 45724b35e411fef7c5da66a74c78428c11d56843 (patch) | |
tree | 9264034aca675c89444e02f72ef58e67d7043604 /src/core/channel | |
parent | 298751c1195523ef6228595043b583c3a6270e08 (diff) |
indent pass to get logical source lines on one physical line
Diffstat (limited to 'src/core/channel')
-rw-r--r-- | src/core/channel/channel_args.c | 251 | ||||
-rw-r--r-- | src/core/channel/channel_args.h | 25 | ||||
-rw-r--r-- | src/core/channel/channel_stack.c | 189 | ||||
-rw-r--r-- | src/core/channel/channel_stack.h | 94 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 1040 | ||||
-rw-r--r-- | src/core/channel/client_channel.h | 23 | ||||
-rw-r--r-- | src/core/channel/compress_filter.c | 452 | ||||
-rw-r--r-- | src/core/channel/connected_channel.c | 121 | ||||
-rw-r--r-- | src/core/channel/connected_channel.h | 3 | ||||
-rw-r--r-- | src/core/channel/context.h | 8 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 315 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 384 | ||||
-rw-r--r-- | src/core/channel/noop_filter.c | 89 |
13 files changed, 1560 insertions, 1434 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c index 487db1119a..4207a97114 100644 --- a/src/core/channel/channel_args.c +++ b/src/core/channel/channel_args.c @@ -41,169 +41,206 @@ #include <string.h> -static grpc_arg copy_arg(const grpc_arg *src) { +static grpc_arg +copy_arg (const grpc_arg * src) +{ grpc_arg dst; dst.type = src->type; - dst.key = gpr_strdup(src->key); - switch (dst.type) { + dst.key = gpr_strdup (src->key); + switch (dst.type) + { case GRPC_ARG_STRING: - dst.value.string = gpr_strdup(src->value.string); + dst.value.string = gpr_strdup (src->value.string); break; case GRPC_ARG_INTEGER: dst.value.integer = src->value.integer; break; case GRPC_ARG_POINTER: dst.value.pointer = src->value.pointer; - dst.value.pointer.p = src->value.pointer.copy - ? src->value.pointer.copy(src->value.pointer.p) - : src->value.pointer.p; + dst.value.pointer.p = src->value.pointer.copy ? src->value.pointer.copy (src->value.pointer.p) : src->value.pointer.p; break; - } + } return dst; } -grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, - const grpc_arg *to_add, - size_t num_to_add) { - grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args)); +grpc_channel_args * +grpc_channel_args_copy_and_add (const grpc_channel_args * src, const grpc_arg * to_add, size_t num_to_add) +{ + grpc_channel_args *dst = gpr_malloc (sizeof (grpc_channel_args)); size_t i; size_t src_num_args = (src == NULL) ? 0 : src->num_args; - if (!src && !to_add) { - dst->num_args = 0; - dst->args = NULL; - return dst; - } + if (!src && !to_add) + { + dst->num_args = 0; + dst->args = NULL; + return dst; + } dst->num_args = src_num_args + num_to_add; - dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args); - for (i = 0; i < src_num_args; i++) { - dst->args[i] = copy_arg(&src->args[i]); - } - for (i = 0; i < num_to_add; i++) { - dst->args[i + src_num_args] = copy_arg(&to_add[i]); - } + dst->args = gpr_malloc (sizeof (grpc_arg) * dst->num_args); + for (i = 0; i < src_num_args; i++) + { + dst->args[i] = copy_arg (&src->args[i]); + } + for (i = 0; i < num_to_add; i++) + { + dst->args[i + src_num_args] = copy_arg (&to_add[i]); + } return dst; } -grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src) { - return grpc_channel_args_copy_and_add(src, NULL, 0); +grpc_channel_args * +grpc_channel_args_copy (const grpc_channel_args * src) +{ + return grpc_channel_args_copy_and_add (src, NULL, 0); } -grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, - const grpc_channel_args *b) { - return grpc_channel_args_copy_and_add(a, b->args, b->num_args); +grpc_channel_args * +grpc_channel_args_merge (const grpc_channel_args * a, const grpc_channel_args * b) +{ + return grpc_channel_args_copy_and_add (a, b->args, b->num_args); } -void grpc_channel_args_destroy(grpc_channel_args *a) { +void +grpc_channel_args_destroy (grpc_channel_args * a) +{ size_t i; - for (i = 0; i < a->num_args; i++) { - switch (a->args[i].type) { - case GRPC_ARG_STRING: - gpr_free(a->args[i].value.string); - break; - case GRPC_ARG_INTEGER: - break; - case GRPC_ARG_POINTER: - if (a->args[i].value.pointer.destroy) { - a->args[i].value.pointer.destroy(a->args[i].value.pointer.p); - } - break; + for (i = 0; i < a->num_args; i++) + { + switch (a->args[i].type) + { + case GRPC_ARG_STRING: + gpr_free (a->args[i].value.string); + break; + case GRPC_ARG_INTEGER: + break; + case GRPC_ARG_POINTER: + if (a->args[i].value.pointer.destroy) + { + a->args[i].value.pointer.destroy (a->args[i].value.pointer.p); + } + break; + } + gpr_free (a->args[i].key); } - gpr_free(a->args[i].key); - } - gpr_free(a->args); - gpr_free(a); + gpr_free (a->args); + gpr_free (a); } -int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) { +int +grpc_channel_args_is_census_enabled (const grpc_channel_args * a) +{ size_t i; - if (a == NULL) return 0; - for (i = 0; i < a->num_args; i++) { - if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) { - return a->args[i].value.integer != 0; + if (a == NULL) + return 0; + for (i = 0; i < a->num_args; i++) + { + if (0 == strcmp (a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) + { + return a->args[i].value.integer != 0; + } } - } return 0; } -grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( - const grpc_channel_args *a) { +grpc_compression_algorithm +grpc_channel_args_get_compression_algorithm (const grpc_channel_args * a) +{ size_t i; - if (a == NULL) return 0; - for (i = 0; i < a->num_args; ++i) { - if (a->args[i].type == GRPC_ARG_INTEGER && - !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) { - return (grpc_compression_algorithm)a->args[i].value.integer; - break; + if (a == NULL) + return 0; + for (i = 0; i < a->num_args; ++i) + { + if (a->args[i].type == GRPC_ARG_INTEGER && !strcmp (GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) + { + return (grpc_compression_algorithm) a->args[i].value.integer; + break; + } } - } return GRPC_COMPRESS_NONE; } -grpc_channel_args *grpc_channel_args_set_compression_algorithm( - grpc_channel_args *a, grpc_compression_algorithm algorithm) { +grpc_channel_args * +grpc_channel_args_set_compression_algorithm (grpc_channel_args * a, grpc_compression_algorithm algorithm) +{ grpc_arg tmp; tmp.type = GRPC_ARG_INTEGER; tmp.key = GRPC_COMPRESSION_ALGORITHM_ARG; tmp.value.integer = algorithm; - return grpc_channel_args_copy_and_add(a, &tmp, 1); + return grpc_channel_args_copy_and_add (a, &tmp, 1); } /** Returns 1 if the argument for compression algorithm's enabled states bitset * was found in \a a, returning the arg's value in \a states. Otherwise, returns * 0. */ -static int find_compression_algorithm_states_bitset(const grpc_channel_args *a, - int **states_arg) { - if (a != NULL) { - size_t i; - for (i = 0; i < a->num_args; ++i) { - if (a->args[i].type == GRPC_ARG_INTEGER && - !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) { - *states_arg = &a->args[i].value.integer; - return 1; /* GPR_TRUE */ - } +static int +find_compression_algorithm_states_bitset (const grpc_channel_args * a, int **states_arg) +{ + if (a != NULL) + { + size_t i; + for (i = 0; i < a->num_args; ++i) + { + if (a->args[i].type == GRPC_ARG_INTEGER && !strcmp (GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) + { + *states_arg = &a->args[i].value.integer; + return 1; /* GPR_TRUE */ + } + } } - } - return 0; /* GPR_FALSE */ + return 0; /* GPR_FALSE */ } -grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( - grpc_channel_args **a, grpc_compression_algorithm algorithm, int state) { +grpc_channel_args * +grpc_channel_args_compression_algorithm_set_state (grpc_channel_args ** a, grpc_compression_algorithm algorithm, int state) +{ int *states_arg; grpc_channel_args *result = *a; - const int states_arg_found = - find_compression_algorithm_states_bitset(*a, &states_arg); + const int states_arg_found = find_compression_algorithm_states_bitset (*a, &states_arg); - if (states_arg_found) { - if (state != 0) { - GPR_BITSET((unsigned *)states_arg, algorithm); - } else { - GPR_BITCLEAR((unsigned *)states_arg, algorithm); + if (states_arg_found) + { + if (state != 0) + { + GPR_BITSET ((unsigned *) states_arg, algorithm); + } + else + { + GPR_BITCLEAR ((unsigned *) states_arg, algorithm); + } } - } else { - /* create a new arg */ - grpc_arg tmp; - tmp.type = GRPC_ARG_INTEGER; - tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG; - /* all enabled by default */ - tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; - if (state != 0) { - GPR_BITSET((unsigned *)&tmp.value.integer, algorithm); - } else { - GPR_BITCLEAR((unsigned *)&tmp.value.integer, algorithm); + else + { + /* create a new arg */ + grpc_arg tmp; + tmp.type = GRPC_ARG_INTEGER; + tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG; + /* all enabled by default */ + tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; + if (state != 0) + { + GPR_BITSET ((unsigned *) &tmp.value.integer, algorithm); + } + else + { + GPR_BITCLEAR ((unsigned *) &tmp.value.integer, algorithm); + } + result = grpc_channel_args_copy_and_add (*a, &tmp, 1); + grpc_channel_args_destroy (*a); + *a = result; } - result = grpc_channel_args_copy_and_add(*a, &tmp, 1); - grpc_channel_args_destroy(*a); - *a = result; - } return result; } -int grpc_channel_args_compression_algorithm_get_states( - const grpc_channel_args *a) { +int +grpc_channel_args_compression_algorithm_get_states (const grpc_channel_args * a) +{ int *states_arg; - if (find_compression_algorithm_states_bitset(a, &states_arg)) { - return *states_arg; - } else { - return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */ - } + if (find_compression_algorithm_states_bitset (a, &states_arg)) + { + return *states_arg; + } + else + { + return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */ + } } diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h index f9e7b05860..bcfd8c46a0 100644 --- a/src/core/channel/channel_args.h +++ b/src/core/channel/channel_args.h @@ -38,34 +38,29 @@ #include <grpc/grpc.h> /* Copy some arguments */ -grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src); +grpc_channel_args *grpc_channel_args_copy (const grpc_channel_args * src); /** Copy some arguments and add the to_add parameter in the end. If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */ -grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, - const grpc_arg *to_add, - size_t num_to_add); +grpc_channel_args *grpc_channel_args_copy_and_add (const grpc_channel_args * src, const grpc_arg * to_add, size_t num_to_add); /** Copy args from a then args from b into a new channel args */ -grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, - const grpc_channel_args *b); +grpc_channel_args *grpc_channel_args_merge (const grpc_channel_args * a, const grpc_channel_args * b); /** Destroy arguments created by grpc_channel_args_copy */ -void grpc_channel_args_destroy(grpc_channel_args *a); +void grpc_channel_args_destroy (grpc_channel_args * a); /** Reads census_enabled settings from channel args. Returns 1 if census_enabled * is specified in channel args, otherwise returns 0. */ -int grpc_channel_args_is_census_enabled(const grpc_channel_args *a); +int grpc_channel_args_is_census_enabled (const grpc_channel_args * a); /** Returns the compression algorithm set in \a a. */ -grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( - const grpc_channel_args *a); +grpc_compression_algorithm grpc_channel_args_get_compression_algorithm (const grpc_channel_args * a); /** Returns a channel arg instance with compression enabled. If \a a is * non-NULL, its args are copied. N.B. GRPC_COMPRESS_NONE disables compression * for the channel. */ -grpc_channel_args *grpc_channel_args_set_compression_algorithm( - grpc_channel_args *a, grpc_compression_algorithm algorithm); +grpc_channel_args *grpc_channel_args_set_compression_algorithm (grpc_channel_args * a, grpc_compression_algorithm algorithm); /** Sets the support for the given compression algorithm. By default, all * compression algorithms are enabled. It's an error to disable an algorithm set @@ -74,15 +69,13 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm( * Returns an instance will the updated algorithm states. The \a a pointer is * modified to point to the returned instance (which may be different from the * input value of \a a). */ -grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( - grpc_channel_args **a, grpc_compression_algorithm algorithm, int enabled); +grpc_channel_args *grpc_channel_args_compression_algorithm_set_state (grpc_channel_args ** a, grpc_compression_algorithm algorithm, int enabled); /** Returns the bitset representing the support state (true for enabled, false * for disabled) for compression algorithms. * * The i-th bit of the returned bitset corresponds to the i-th entry in the * grpc_compression_algorithm enum. */ -int grpc_channel_args_compression_algorithm_get_states( - const grpc_channel_args *a); +int grpc_channel_args_compression_algorithm_get_states (const grpc_channel_args * a); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */ diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 47f1f8e828..aafc0ede9c 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -59,21 +59,20 @@ int grpc_trace_channel = 0; #define ROUND_UP_TO_ALIGNMENT_SIZE(x) \ (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u)) -size_t grpc_channel_stack_size(const grpc_channel_filter **filters, - size_t filter_count) { +size_t +grpc_channel_stack_size (const grpc_channel_filter ** filters, size_t filter_count) +{ /* always need the header, and size for the channel elements */ - size_t size = - ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack)) + - ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element)); + size_t size = ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_channel_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE (filter_count * sizeof (grpc_channel_element)); size_t i; - GPR_ASSERT((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0 && - "GPR_MAX_ALIGNMENT must be a power of two"); + GPR_ASSERT ((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0 && "GPR_MAX_ALIGNMENT must be a power of two"); /* add the size for each filter */ - for (i = 0; i < filter_count; i++) { - size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data); - } + for (i = 0; i < filter_count; i++) + { + size += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_channel_data); + } return size; } @@ -86,144 +85,142 @@ size_t grpc_channel_stack_size(const grpc_channel_filter **filters, ((grpc_call_element *)((char *)(stk) + \ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)))) -grpc_channel_element *grpc_channel_stack_element( - grpc_channel_stack *channel_stack, size_t index) { - return CHANNEL_ELEMS_FROM_STACK(channel_stack) + index; +grpc_channel_element * +grpc_channel_stack_element (grpc_channel_stack * channel_stack, size_t index) +{ + return CHANNEL_ELEMS_FROM_STACK (channel_stack) + index; } -grpc_channel_element *grpc_channel_stack_last_element( - grpc_channel_stack *channel_stack) { - return grpc_channel_stack_element(channel_stack, channel_stack->count - 1); +grpc_channel_element * +grpc_channel_stack_last_element (grpc_channel_stack * channel_stack) +{ + return grpc_channel_stack_element (channel_stack, channel_stack->count - 1); } -grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack, - size_t index) { - return CALL_ELEMS_FROM_STACK(call_stack) + index; +grpc_call_element * +grpc_call_stack_element (grpc_call_stack * call_stack, size_t index) +{ + return CALL_ELEMS_FROM_STACK (call_stack) + index; } -void grpc_channel_stack_init(const grpc_channel_filter **filters, - size_t filter_count, grpc_channel *master, - const grpc_channel_args *args, - grpc_mdctx *metadata_context, - grpc_channel_stack *stack, - grpc_closure_list *closure_list) { - size_t call_size = - ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + - ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element)); +void +grpc_channel_stack_init (const grpc_channel_filter ** filters, size_t filter_count, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, grpc_channel_stack * stack, grpc_closure_list * closure_list) +{ + size_t call_size = ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_call_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE (filter_count * sizeof (grpc_call_element)); grpc_channel_element *elems; char *user_data; size_t i; stack->count = filter_count; - elems = CHANNEL_ELEMS_FROM_STACK(stack); - user_data = - ((char *)elems) + - ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element)); + elems = CHANNEL_ELEMS_FROM_STACK (stack); + user_data = ((char *) elems) + ROUND_UP_TO_ALIGNMENT_SIZE (filter_count * sizeof (grpc_channel_element)); /* init per-filter data */ - for (i = 0; i < filter_count; i++) { - elems[i].filter = filters[i]; - elems[i].channel_data = user_data; - elems[i].filter->init_channel_elem(&elems[i], master, args, - metadata_context, i == 0, - i == (filter_count - 1), closure_list); - user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data); - call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data); - } - - GPR_ASSERT(user_data > (char *)stack); - GPR_ASSERT((gpr_uintptr)(user_data - (char *)stack) == - grpc_channel_stack_size(filters, filter_count)); + for (i = 0; i < filter_count; i++) + { + elems[i].filter = filters[i]; + elems[i].channel_data = user_data; + elems[i].filter->init_channel_elem (&elems[i], master, args, metadata_context, i == 0, i == (filter_count - 1), closure_list); + user_data += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_channel_data); + call_size += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_call_data); + } + + GPR_ASSERT (user_data > (char *) stack); + GPR_ASSERT ((gpr_uintptr) (user_data - (char *) stack) == grpc_channel_stack_size (filters, filter_count)); stack->call_stack_size = call_size; } -void grpc_channel_stack_destroy(grpc_channel_stack *stack, - grpc_closure_list *closure_list) { - grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(stack); +void +grpc_channel_stack_destroy (grpc_channel_stack * stack, grpc_closure_list * closure_list) +{ + grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK (stack); size_t count = stack->count; size_t i; /* destroy per-filter data */ - for (i = 0; i < count; i++) { - channel_elems[i].filter->destroy_channel_elem(&channel_elems[i], - closure_list); - } + for (i = 0; i < count; i++) + { + channel_elems[i].filter->destroy_channel_elem (&channel_elems[i], closure_list); + } } -void grpc_call_stack_init(grpc_channel_stack *channel_stack, - const void *transport_server_data, - grpc_transport_stream_op *initial_op, - grpc_call_stack *call_stack, - grpc_closure_list *closure_list) { - grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); +void +grpc_call_stack_init (grpc_channel_stack * channel_stack, const void *transport_server_data, grpc_transport_stream_op * initial_op, grpc_call_stack * call_stack, grpc_closure_list * closure_list) +{ + grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK (channel_stack); size_t count = channel_stack->count; grpc_call_element *call_elems; char *user_data; size_t i; call_stack->count = count; - call_elems = CALL_ELEMS_FROM_STACK(call_stack); - user_data = ((char *)call_elems) + - ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); + call_elems = CALL_ELEMS_FROM_STACK (call_stack); + user_data = ((char *) call_elems) + ROUND_UP_TO_ALIGNMENT_SIZE (count * sizeof (grpc_call_element)); /* init per-filter data */ - for (i = 0; i < count; i++) { - call_elems[i].filter = channel_elems[i].filter; - call_elems[i].channel_data = channel_elems[i].channel_data; - call_elems[i].call_data = user_data; - call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data, - initial_op, closure_list); - user_data += - ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); - } + for (i = 0; i < count; i++) + { + call_elems[i].filter = channel_elems[i].filter; + call_elems[i].channel_data = channel_elems[i].channel_data; + call_elems[i].call_data = user_data; + call_elems[i].filter->init_call_elem (&call_elems[i], transport_server_data, initial_op, closure_list); + user_data += ROUND_UP_TO_ALIGNMENT_SIZE (call_elems[i].filter->sizeof_call_data); + } } -void grpc_call_stack_destroy(grpc_call_stack *stack, - grpc_closure_list *closure_list) { - grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack); +void +grpc_call_stack_destroy (grpc_call_stack * stack, grpc_closure_list * closure_list) +{ + grpc_call_element *elems = CALL_ELEMS_FROM_STACK (stack); size_t count = stack->count; size_t i; /* destroy per-filter data */ - for (i = 0; i < count; i++) { - elems[i].filter->destroy_call_elem(&elems[i], closure_list); - } + for (i = 0; i < count; i++) + { + elems[i].filter->destroy_call_elem (&elems[i], closure_list); + } } -void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op, - grpc_closure_list *closure_list) { +void +grpc_call_next_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list) +{ grpc_call_element *next_elem = elem + 1; - next_elem->filter->start_transport_stream_op(next_elem, op, closure_list); + next_elem->filter->start_transport_stream_op (next_elem, op, closure_list); } -char *grpc_call_next_get_peer(grpc_call_element *elem, - grpc_closure_list *closure_list) { +char * +grpc_call_next_get_peer (grpc_call_element * elem, grpc_closure_list * closure_list) +{ grpc_call_element *next_elem = elem + 1; - return next_elem->filter->get_peer(next_elem, closure_list); + return next_elem->filter->get_peer (next_elem, closure_list); } -void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op, - grpc_closure_list *closure_list) { +void +grpc_channel_next_op (grpc_channel_element * elem, grpc_transport_op * op, grpc_closure_list * closure_list) +{ grpc_channel_element *next_elem = elem + 1; - next_elem->filter->start_transport_op(next_elem, op, closure_list); + next_elem->filter->start_transport_op (next_elem, op, closure_list); } -grpc_channel_stack *grpc_channel_stack_from_top_element( - grpc_channel_element *elem) { - return (grpc_channel_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE( - sizeof(grpc_channel_stack))); +grpc_channel_stack * +grpc_channel_stack_from_top_element (grpc_channel_element * elem) +{ + return (grpc_channel_stack *) ((char *) (elem) - ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_channel_stack))); } -grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { - return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE( - sizeof(grpc_call_stack))); +grpc_call_stack * +grpc_call_stack_from_top_element (grpc_call_element * elem) +{ + return (grpc_call_stack *) ((char *) (elem) - ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_call_stack))); } -void grpc_call_element_send_cancel(grpc_call_element *cur_elem, - grpc_closure_list *closure_list) { +void +grpc_call_element_send_cancel (grpc_call_element * cur_elem, grpc_closure_list * closure_list) +{ grpc_transport_stream_op op; - memset(&op, 0, sizeof(op)); + memset (&op, 0, sizeof (op)); op.cancel_with_status = GRPC_STATUS_CANCELLED; - grpc_call_next_op(cur_elem, &op, closure_list); + grpc_call_next_op (cur_elem, &op, closure_list); } diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index b3facd4f01..937bb04eda 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -61,17 +61,15 @@ typedef struct grpc_call_element grpc_call_element; 4. a name, which is useful when debugging Members are laid out in approximate frequency of use order. */ -typedef struct { +typedef struct +{ /* Called to eg. send/receive data on a call. See grpc_call_next_op on how to call the next element in the stack */ - void (*start_transport_stream_op)(grpc_call_element *elem, - grpc_transport_stream_op *op, - grpc_closure_list *closure_list); + void (*start_transport_stream_op) (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list); /* Called to handle channel level operations - e.g. new calls, or transport closure. See grpc_channel_next_op on how to call the next element in the stack */ - void (*start_transport_op)(grpc_channel_element *elem, grpc_transport_op *op, - grpc_closure_list *closure_list); + void (*start_transport_op) (grpc_channel_element * elem, grpc_transport_op * op, grpc_closure_list * closure_list); /* sizeof(per call data) */ size_t sizeof_call_data; @@ -82,15 +80,11 @@ typedef struct { server_transport_data is an opaque pointer. If it is NULL, this call is on a client; if it is non-NULL, then it points to memory owned by the transport and is on the server. Most filters want to ignore this - argument.*/ - void (*init_call_elem)(grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op, - grpc_closure_list *closure_list); + argument. */ + void (*init_call_elem) (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list); /* Destroy per call data. The filter does not need to do any chaining */ - void (*destroy_call_elem)(grpc_call_element *elem, - grpc_closure_list *closure_list); + void (*destroy_call_elem) (grpc_call_element * elem, grpc_closure_list * closure_list); /* sizeof(per channel data) */ size_t sizeof_channel_data; @@ -100,17 +94,13 @@ typedef struct { is_first, is_last designate this elements position in the stack, and are useful for asserting correct configuration by upper layer code. The filter does not need to do any chaining */ - void (*init_channel_elem)(grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, - grpc_mdctx *metadata_context, int is_first, - int is_last, grpc_closure_list *closure_list); + void (*init_channel_elem) (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, int is_first, int is_last, grpc_closure_list * closure_list); /* Destroy per channel data. The filter does not need to do any chaining */ - void (*destroy_channel_elem)(grpc_channel_element *elem, - grpc_closure_list *closure_list); + void (*destroy_channel_elem) (grpc_channel_element * elem, grpc_closure_list * closure_list); /* Implement grpc_call_get_peer() */ - char *(*get_peer)(grpc_call_element *elem, grpc_closure_list *closure_list); + char *(*get_peer) (grpc_call_element * elem, grpc_closure_list * closure_list); /* The name of this filter */ const char *name; @@ -118,7 +108,8 @@ typedef struct { /* A channel_element tracks its filter and the filter requested memory within a channel allocation */ -struct grpc_channel_element { +struct grpc_channel_element +{ const grpc_channel_filter *filter; void *channel_data; }; @@ -126,7 +117,8 @@ struct grpc_channel_element { /* A call_element tracks its filter, the filter requested memory within a channel allocation, and the filter requested memory within a call allocation */ -struct grpc_call_element { +struct grpc_call_element +{ const grpc_channel_filter *filter; void *channel_data; void *call_data; @@ -134,7 +126,8 @@ struct grpc_call_element { /* A channel stack tracks a set of related filters for one channel, and guarantees they live within a single malloc() allocation */ -typedef struct { +typedef struct +{ size_t count; /* Memory required for a call stack (computed at channel stack initialization) */ @@ -143,65 +136,48 @@ typedef struct { /* A call stack tracks a set of related filters for one call, and guarantees they live within a single malloc() allocation */ -typedef struct { size_t count; } grpc_call_stack; +typedef struct +{ + size_t count; +} grpc_call_stack; /* Get a channel element given a channel stack and its index */ -grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack, - size_t i); +grpc_channel_element *grpc_channel_stack_element (grpc_channel_stack * stack, size_t i); /* Get the last channel element in a channel stack */ -grpc_channel_element *grpc_channel_stack_last_element( - grpc_channel_stack *stack); +grpc_channel_element *grpc_channel_stack_last_element (grpc_channel_stack * stack); /* Get a call stack element given a call stack and an index */ -grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i); +grpc_call_element *grpc_call_stack_element (grpc_call_stack * stack, size_t i); /* Determine memory required for a channel stack containing a set of filters */ -size_t grpc_channel_stack_size(const grpc_channel_filter **filters, - size_t filter_count); +size_t grpc_channel_stack_size (const grpc_channel_filter ** filters, size_t filter_count); /* Initialize a channel stack given some filters */ -void grpc_channel_stack_init(const grpc_channel_filter **filters, - size_t filter_count, grpc_channel *master, - const grpc_channel_args *args, - grpc_mdctx *metadata_context, - grpc_channel_stack *stack, - grpc_closure_list *closure_list); +void grpc_channel_stack_init (const grpc_channel_filter ** filters, size_t filter_count, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, grpc_channel_stack * stack, grpc_closure_list * closure_list); /* Destroy a channel stack */ -void grpc_channel_stack_destroy(grpc_channel_stack *stack, - grpc_closure_list *closure_list); +void grpc_channel_stack_destroy (grpc_channel_stack * stack, grpc_closure_list * closure_list); /* Initialize a call stack given a channel stack. transport_server_data is expected to be NULL on a client, or an opaque transport owned pointer on the server. */ -void grpc_call_stack_init(grpc_channel_stack *channel_stack, - const void *transport_server_data, - grpc_transport_stream_op *initial_op, - grpc_call_stack *call_stack, - grpc_closure_list *closure_list); +void grpc_call_stack_init (grpc_channel_stack * channel_stack, const void *transport_server_data, grpc_transport_stream_op * initial_op, grpc_call_stack * call_stack, grpc_closure_list * closure_list); /* Destroy a call stack */ -void grpc_call_stack_destroy(grpc_call_stack *stack, - grpc_closure_list *closure_list); +void grpc_call_stack_destroy (grpc_call_stack * stack, grpc_closure_list * closure_list); /* Call the next operation in a call stack */ -void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op, - grpc_closure_list *closure_list); +void grpc_call_next_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list); /* Call the next operation (depending on call directionality) in a channel stack */ -void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op, - grpc_closure_list *closure_list); +void grpc_channel_next_op (grpc_channel_element * elem, grpc_transport_op * op, grpc_closure_list * closure_list); /* Pass through a request to get_peer to the next child element */ -char *grpc_call_next_get_peer(grpc_call_element *elem, - grpc_closure_list *closure_list); +char *grpc_call_next_get_peer (grpc_call_element * elem, grpc_closure_list * closure_list); /* Given the top element of a channel stack, get the channel stack itself */ -grpc_channel_stack *grpc_channel_stack_from_top_element( - grpc_channel_element *elem); +grpc_channel_stack *grpc_channel_stack_from_top_element (grpc_channel_element * elem); /* Given the top element of a call stack, get the call stack itself */ -grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem); +grpc_call_stack *grpc_call_stack_from_top_element (grpc_call_element * elem); -void grpc_call_log_op(char *file, int line, gpr_log_severity severity, - grpc_call_element *elem, grpc_transport_stream_op *op); +void grpc_call_log_op (char *file, int line, gpr_log_severity severity, grpc_call_element * elem, grpc_transport_stream_op * op); -void grpc_call_element_send_cancel(grpc_call_element *cur_elem, - grpc_closure_list *closure_list); +void grpc_call_element_send_cancel (grpc_call_element * cur_elem, grpc_closure_list * closure_list); extern int grpc_trace_channel; diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 903f9eab50..40b428cf3e 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -51,7 +51,8 @@ typedef struct call_data call_data; -typedef struct { +typedef struct +{ /** metadata context for this channel */ grpc_mdctx *mdctx; /** resolver for this channel */ @@ -89,14 +90,16 @@ typedef struct { to watch for state changes from the lb_policy. When a state change is seen, we update the channel, and create a new watcher */ -typedef struct { +typedef struct +{ channel_data *chand; grpc_closure on_changed; grpc_connectivity_state state; grpc_lb_policy *lb_policy; } lb_policy_connectivity_watcher; -typedef enum { +typedef enum +{ CALL_CREATED, CALL_WAITING_FOR_SEND, CALL_WAITING_FOR_CONFIG, @@ -106,7 +109,8 @@ typedef enum { CALL_CANCELLED } call_state; -struct call_data { +struct call_data +{ /* owning element */ grpc_call_element *elem; @@ -123,367 +127,406 @@ struct call_data { grpc_linked_mdelem details; }; -static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, - grpc_transport_stream_op *new_op) - GRPC_MUST_USE_RESULT; +static grpc_closure * +merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_op) + GRPC_MUST_USE_RESULT; -static void handle_op_after_cancellation(grpc_call_element *elem, - grpc_transport_stream_op *op, - grpc_closure_list *closure_list) { + static void handle_op_after_cancellation (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list) +{ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - if (op->send_ops) { - grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send->cb(op->on_done_send->cb_arg, 0, closure_list); - } - if (op->recv_ops) { - char status[GPR_LTOA_MIN_BUFSIZE]; - grpc_metadata_batch mdb; - gpr_ltoa(GRPC_STATUS_CANCELLED, status); - calld->status.md = - grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status); - calld->details.md = - grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled"); - calld->status.prev = calld->details.next = NULL; - calld->status.next = &calld->details; - calld->details.prev = &calld->status; - mdb.list.head = &calld->status; - mdb.list.tail = &calld->details; - mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - grpc_sopb_add_metadata(op->recv_ops, mdb); - *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv->cb(op->on_done_recv->cb_arg, 1, closure_list); - } - if (op->on_consumed) { - op->on_consumed->cb(op->on_consumed->cb_arg, 0, closure_list); - } + if (op->send_ops) + { + grpc_stream_ops_unref_owned_objects (op->send_ops->ops, op->send_ops->nops); + op->on_done_send->cb (op->on_done_send->cb_arg, 0, closure_list); + } + if (op->recv_ops) + { + char status[GPR_LTOA_MIN_BUFSIZE]; + grpc_metadata_batch mdb; + gpr_ltoa (GRPC_STATUS_CANCELLED, status); + calld->status.md = grpc_mdelem_from_strings (chand->mdctx, "grpc-status", status); + calld->details.md = grpc_mdelem_from_strings (chand->mdctx, "grpc-message", "Cancelled"); + calld->status.prev = calld->details.next = NULL; + calld->status.next = &calld->details; + calld->details.prev = &calld->status; + mdb.list.head = &calld->status; + mdb.list.tail = &calld->details; + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = gpr_inf_future (GPR_CLOCK_REALTIME); + grpc_sopb_add_metadata (op->recv_ops, mdb); + *op->recv_state = GRPC_STREAM_CLOSED; + op->on_done_recv->cb (op->on_done_recv->cb_arg, 1, closure_list); + } + if (op->on_consumed) + { + op->on_consumed->cb (op->on_consumed->cb_arg, 0, closure_list); + } } -typedef struct { +typedef struct +{ grpc_closure closure; grpc_call_element *elem; } waiting_call; -static void perform_transport_stream_op(grpc_call_element *elem, - grpc_transport_stream_op *op, - int continuation, - grpc_closure_list *closure_list); +static void perform_transport_stream_op (grpc_call_element * elem, grpc_transport_stream_op * op, int continuation, grpc_closure_list * closure_list); -static void continue_with_pick(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { +static void +continue_with_pick (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ waiting_call *wc = arg; call_data *calld = wc->elem->call_data; - perform_transport_stream_op(wc->elem, &calld->waiting_op, 1, closure_list); - gpr_free(wc); + perform_transport_stream_op (wc->elem, &calld->waiting_op, 1, closure_list); + gpr_free (wc); } -static void add_to_lb_policy_wait_queue_locked_state_config( - grpc_call_element *elem) { +static void +add_to_lb_policy_wait_queue_locked_state_config (grpc_call_element * elem) +{ channel_data *chand = elem->channel_data; - waiting_call *wc = gpr_malloc(sizeof(*wc)); - grpc_closure_init(&wc->closure, continue_with_pick, wc); + waiting_call *wc = gpr_malloc (sizeof (*wc)); + grpc_closure_init (&wc->closure, continue_with_pick, wc); wc->elem = elem; - grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1); + grpc_closure_list_add (&chand->waiting_for_config_closures, &wc->closure, 1); } -static int is_empty(void *p, int len) { +static int +is_empty (void *p, int len) +{ char *ptr = p; int i; - for (i = 0; i < len; i++) { - if (ptr[i] != 0) return 0; - } + for (i = 0; i < len; i++) + { + if (ptr[i] != 0) + return 0; + } return 1; } -static void started_call(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { +static void +started_call (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ call_data *calld = arg; grpc_transport_stream_op op; int have_waiting; - gpr_mu_lock(&calld->mu_state); - if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) { - memset(&op, 0, sizeof(op)); - op.cancel_with_status = GRPC_STATUS_CANCELLED; - gpr_mu_unlock(&calld->mu_state); - grpc_subchannel_call_process_op(calld->subchannel_call, &op, closure_list); - } else if (calld->state == CALL_WAITING_FOR_CALL) { - have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); - if (calld->subchannel_call != NULL) { - calld->state = CALL_ACTIVE; - gpr_mu_unlock(&calld->mu_state); - if (have_waiting) { - grpc_subchannel_call_process_op(calld->subchannel_call, - &calld->waiting_op, closure_list); - } - } else { - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&calld->mu_state); - if (have_waiting) { - handle_op_after_cancellation(calld->elem, &calld->waiting_op, - closure_list); - } - } - } else { - GPR_ASSERT(calld->state == CALL_CANCELLED); - gpr_mu_unlock(&calld->mu_state); - } + gpr_mu_lock (&calld->mu_state); + if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) + { + memset (&op, 0, sizeof (op)); + op.cancel_with_status = GRPC_STATUS_CANCELLED; + gpr_mu_unlock (&calld->mu_state); + grpc_subchannel_call_process_op (calld->subchannel_call, &op, closure_list); + } + else if (calld->state == CALL_WAITING_FOR_CALL) + { + have_waiting = !is_empty (&calld->waiting_op, sizeof (calld->waiting_op)); + if (calld->subchannel_call != NULL) + { + calld->state = CALL_ACTIVE; + gpr_mu_unlock (&calld->mu_state); + if (have_waiting) + { + grpc_subchannel_call_process_op (calld->subchannel_call, &calld->waiting_op, closure_list); + } + } + else + { + calld->state = CALL_CANCELLED; + gpr_mu_unlock (&calld->mu_state); + if (have_waiting) + { + handle_op_after_cancellation (calld->elem, &calld->waiting_op, closure_list); + } + } + } + else + { + GPR_ASSERT (calld->state == CALL_CANCELLED); + gpr_mu_unlock (&calld->mu_state); + } } -static void picked_target(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { +static void +picked_target (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ call_data *calld = arg; grpc_pollset *pollset; - if (calld->picked_channel == NULL) { - /* treat this like a cancellation */ - calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE; - perform_transport_stream_op(calld->elem, &calld->waiting_op, 1, - closure_list); - } else { - gpr_mu_lock(&calld->mu_state); - if (calld->state == CALL_CANCELLED) { - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(calld->elem, &calld->waiting_op, - closure_list); - } else { - GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK); - calld->state = CALL_WAITING_FOR_CALL; - pollset = calld->waiting_op.bind_pollset; - gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->async_setup_task, started_call, calld); - grpc_subchannel_create_call(calld->picked_channel, pollset, - &calld->subchannel_call, - &calld->async_setup_task, closure_list); - } - } + if (calld->picked_channel == NULL) + { + /* treat this like a cancellation */ + calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE; + perform_transport_stream_op (calld->elem, &calld->waiting_op, 1, closure_list); + } + else + { + gpr_mu_lock (&calld->mu_state); + if (calld->state == CALL_CANCELLED) + { + gpr_mu_unlock (&calld->mu_state); + handle_op_after_cancellation (calld->elem, &calld->waiting_op, closure_list); + } + else + { + GPR_ASSERT (calld->state == CALL_WAITING_FOR_PICK); + calld->state = CALL_WAITING_FOR_CALL; + pollset = calld->waiting_op.bind_pollset; + gpr_mu_unlock (&calld->mu_state); + grpc_closure_init (&calld->async_setup_task, started_call, calld); + grpc_subchannel_create_call (calld->picked_channel, pollset, &calld->subchannel_call, &calld->async_setup_task, closure_list); + } + } } -static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, - grpc_transport_stream_op *new_op) { +static grpc_closure * +merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_op) +{ call_data *calld = elem->call_data; grpc_closure *consumed_op = NULL; grpc_transport_stream_op *waiting_op = &calld->waiting_op; - GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1); - GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1); - if (new_op->send_ops != NULL) { - waiting_op->send_ops = new_op->send_ops; - waiting_op->is_last_send = new_op->is_last_send; - waiting_op->on_done_send = new_op->on_done_send; - } - if (new_op->recv_ops != NULL) { - waiting_op->recv_ops = new_op->recv_ops; - waiting_op->recv_state = new_op->recv_state; - waiting_op->on_done_recv = new_op->on_done_recv; - } - if (new_op->on_consumed != NULL) { - if (waiting_op->on_consumed != NULL) { - consumed_op = waiting_op->on_consumed; - } - waiting_op->on_consumed = new_op->on_consumed; - } - if (new_op->cancel_with_status != GRPC_STATUS_OK) { - waiting_op->cancel_with_status = new_op->cancel_with_status; - } + GPR_ASSERT ((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1); + GPR_ASSERT ((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1); + if (new_op->send_ops != NULL) + { + waiting_op->send_ops = new_op->send_ops; + waiting_op->is_last_send = new_op->is_last_send; + waiting_op->on_done_send = new_op->on_done_send; + } + if (new_op->recv_ops != NULL) + { + waiting_op->recv_ops = new_op->recv_ops; + waiting_op->recv_state = new_op->recv_state; + waiting_op->on_done_recv = new_op->on_done_recv; + } + if (new_op->on_consumed != NULL) + { + if (waiting_op->on_consumed != NULL) + { + consumed_op = waiting_op->on_consumed; + } + waiting_op->on_consumed = new_op->on_consumed; + } + if (new_op->cancel_with_status != GRPC_STATUS_OK) + { + waiting_op->cancel_with_status = new_op->cancel_with_status; + } return consumed_op; } -static char *cc_get_peer(grpc_call_element *elem, - grpc_closure_list *closure_list) { +static char * +cc_get_peer (grpc_call_element * elem, grpc_closure_list * closure_list) +{ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; char *result; - gpr_mu_lock(&calld->mu_state); - if (calld->state == CALL_ACTIVE) { - subchannel_call = calld->subchannel_call; - GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer"); - gpr_mu_unlock(&calld->mu_state); - result = grpc_subchannel_call_get_peer(subchannel_call, closure_list); - GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "get_peer", closure_list); - return result; - } else { - gpr_mu_unlock(&calld->mu_state); - return grpc_channel_get_target(chand->master); - } + gpr_mu_lock (&calld->mu_state); + if (calld->state == CALL_ACTIVE) + { + subchannel_call = calld->subchannel_call; + GRPC_SUBCHANNEL_CALL_REF (subchannel_call, "get_peer"); + gpr_mu_unlock (&calld->mu_state); + result = grpc_subchannel_call_get_peer (subchannel_call, closure_list); + GRPC_SUBCHANNEL_CALL_UNREF (subchannel_call, "get_peer", closure_list); + return result; + } + else + { + gpr_mu_unlock (&calld->mu_state); + return grpc_channel_get_target (chand->master); + } } -static void perform_transport_stream_op(grpc_call_element *elem, - grpc_transport_stream_op *op, - int continuation, - grpc_closure_list *closure_list) { +static void +perform_transport_stream_op (grpc_call_element * elem, grpc_transport_stream_op * op, int continuation, grpc_closure_list * closure_list) +{ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; grpc_transport_stream_op op2; - GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + GPR_ASSERT (elem->filter == &grpc_client_channel_filter); + GRPC_CALL_LOG_OP (GPR_INFO, elem, op); - gpr_mu_lock(&calld->mu_state); - switch (calld->state) { + gpr_mu_lock (&calld->mu_state); + switch (calld->state) + { case CALL_ACTIVE: - GPR_ASSERT(!continuation); + GPR_ASSERT (!continuation); subchannel_call = calld->subchannel_call; - gpr_mu_unlock(&calld->mu_state); - grpc_subchannel_call_process_op(subchannel_call, op, closure_list); + gpr_mu_unlock (&calld->mu_state); + grpc_subchannel_call_process_op (subchannel_call, op, closure_list); break; case CALL_CANCELLED: - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op, closure_list); + gpr_mu_unlock (&calld->mu_state); + handle_op_after_cancellation (elem, op, closure_list); break; case CALL_WAITING_FOR_SEND: - GPR_ASSERT(!continuation); - grpc_closure_list_add(closure_list, merge_into_waiting_op(elem, op), 1); - if (!calld->waiting_op.send_ops && - calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) { - gpr_mu_unlock(&calld->mu_state); - break; - } + GPR_ASSERT (!continuation); + grpc_closure_list_add (closure_list, merge_into_waiting_op (elem, op), 1); + if (!calld->waiting_op.send_ops && calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) + { + gpr_mu_unlock (&calld->mu_state); + break; + } *op = calld->waiting_op; - memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); + memset (&calld->waiting_op, 0, sizeof (calld->waiting_op)); continuation = 1; - /* fall through */ + /* fall through */ case CALL_WAITING_FOR_CONFIG: case CALL_WAITING_FOR_PICK: case CALL_WAITING_FOR_CALL: - if (!continuation) { - if (op->cancel_with_status != GRPC_STATUS_OK) { - calld->state = CALL_CANCELLED; - op2 = calld->waiting_op; - memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); - if (op->on_consumed) { - calld->waiting_op.on_consumed = op->on_consumed; - op->on_consumed = NULL; - } else if (op2.on_consumed) { - calld->waiting_op.on_consumed = op2.on_consumed; - op2.on_consumed = NULL; - } - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op, closure_list); - handle_op_after_cancellation(elem, &op2, closure_list); - } else { - grpc_closure_list_add(closure_list, merge_into_waiting_op(elem, op), - 1); - gpr_mu_unlock(&calld->mu_state); - } - break; - } - /* fall through */ + if (!continuation) + { + if (op->cancel_with_status != GRPC_STATUS_OK) + { + calld->state = CALL_CANCELLED; + op2 = calld->waiting_op; + memset (&calld->waiting_op, 0, sizeof (calld->waiting_op)); + if (op->on_consumed) + { + calld->waiting_op.on_consumed = op->on_consumed; + op->on_consumed = NULL; + } + else if (op2.on_consumed) + { + calld->waiting_op.on_consumed = op2.on_consumed; + op2.on_consumed = NULL; + } + gpr_mu_unlock (&calld->mu_state); + handle_op_after_cancellation (elem, op, closure_list); + handle_op_after_cancellation (elem, &op2, closure_list); + } + else + { + grpc_closure_list_add (closure_list, merge_into_waiting_op (elem, op), 1); + gpr_mu_unlock (&calld->mu_state); + } + break; + } + /* fall through */ case CALL_CREATED: - if (op->cancel_with_status != GRPC_STATUS_OK) { - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op, closure_list); - } else { - calld->waiting_op = *op; - - if (op->send_ops == NULL) { - /* need to have some send ops before we can select the - lb target */ - calld->state = CALL_WAITING_FOR_SEND; - gpr_mu_unlock(&calld->mu_state); - } else { - gpr_mu_lock(&chand->mu_config); - lb_policy = chand->lb_policy; - if (lb_policy) { - grpc_transport_stream_op *op = &calld->waiting_op; - grpc_pollset *bind_pollset = op->bind_pollset; - grpc_metadata_batch *initial_metadata = - &op->send_ops->ops[0].data.metadata; - GRPC_LB_POLICY_REF(lb_policy, "pick"); - gpr_mu_unlock(&chand->mu_config); - calld->state = CALL_WAITING_FOR_PICK; - - GPR_ASSERT(op->bind_pollset); - GPR_ASSERT(op->send_ops); - GPR_ASSERT(op->send_ops->nops >= 1); - GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA); - gpr_mu_unlock(&calld->mu_state); - - grpc_closure_init(&calld->async_setup_task, picked_target, calld); - grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata, - &calld->picked_channel, - &calld->async_setup_task, closure_list); - - GRPC_LB_POLICY_UNREF(lb_policy, "pick", closure_list); - } else if (chand->resolver != NULL) { - calld->state = CALL_WAITING_FOR_CONFIG; - add_to_lb_policy_wait_queue_locked_state_config(elem); - if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); - chand->started_resolving = 1; - grpc_resolver_next(chand->resolver, - &chand->incoming_configuration, - &chand->on_config_changed, closure_list); - } - gpr_mu_unlock(&chand->mu_config); - gpr_mu_unlock(&calld->mu_state); - } else { - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&chand->mu_config); - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op, closure_list); - } - } - } + if (op->cancel_with_status != GRPC_STATUS_OK) + { + calld->state = CALL_CANCELLED; + gpr_mu_unlock (&calld->mu_state); + handle_op_after_cancellation (elem, op, closure_list); + } + else + { + calld->waiting_op = *op; + + if (op->send_ops == NULL) + { + /* need to have some send ops before we can select the + lb target */ + calld->state = CALL_WAITING_FOR_SEND; + gpr_mu_unlock (&calld->mu_state); + } + else + { + gpr_mu_lock (&chand->mu_config); + lb_policy = chand->lb_policy; + if (lb_policy) + { + grpc_transport_stream_op *op = &calld->waiting_op; + grpc_pollset *bind_pollset = op->bind_pollset; + grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata; + GRPC_LB_POLICY_REF (lb_policy, "pick"); + gpr_mu_unlock (&chand->mu_config); + calld->state = CALL_WAITING_FOR_PICK; + + GPR_ASSERT (op->bind_pollset); + GPR_ASSERT (op->send_ops); + GPR_ASSERT (op->send_ops->nops >= 1); + GPR_ASSERT (op->send_ops->ops[0].type == GRPC_OP_METADATA); + gpr_mu_unlock (&calld->mu_state); + + grpc_closure_init (&calld->async_setup_task, picked_target, calld); + grpc_lb_policy_pick (lb_policy, bind_pollset, initial_metadata, &calld->picked_channel, &calld->async_setup_task, closure_list); + + GRPC_LB_POLICY_UNREF (lb_policy, "pick", closure_list); + } + else if (chand->resolver != NULL) + { + calld->state = CALL_WAITING_FOR_CONFIG; + add_to_lb_policy_wait_queue_locked_state_config (elem); + if (!chand->started_resolving && chand->resolver != NULL) + { + GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); + chand->started_resolving = 1; + grpc_resolver_next (chand->resolver, &chand->incoming_configuration, &chand->on_config_changed, closure_list); + } + gpr_mu_unlock (&chand->mu_config); + gpr_mu_unlock (&calld->mu_state); + } + else + { + calld->state = CALL_CANCELLED; + gpr_mu_unlock (&chand->mu_config); + gpr_mu_unlock (&calld->mu_state); + handle_op_after_cancellation (elem, op, closure_list); + } + } + } break; - } + } } -static void cc_start_transport_stream_op(grpc_call_element *elem, - grpc_transport_stream_op *op, - grpc_closure_list *closure_list) { - perform_transport_stream_op(elem, op, 0, closure_list); +static void +cc_start_transport_stream_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list) +{ + perform_transport_stream_op (elem, op, 0, closure_list); } -static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, - grpc_connectivity_state current_state, - grpc_closure_list *cl); +static void watch_lb_policy (channel_data * chand, grpc_lb_policy * lb_policy, grpc_connectivity_state current_state, grpc_closure_list * cl); -static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w, - grpc_closure_list *cl) { +static void +on_lb_policy_state_changed_locked (lb_policy_connectivity_watcher * w, grpc_closure_list * cl) +{ /* check if the notification is for a stale policy */ - if (w->lb_policy != w->chand->lb_policy) return; + if (w->lb_policy != w->chand->lb_policy) + return; - grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed", - cl); - if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { - watch_lb_policy(w->chand, w->lb_policy, w->state, cl); - } + grpc_connectivity_state_set (&w->chand->state_tracker, w->state, "lb_changed", cl); + if (w->state != GRPC_CHANNEL_FATAL_FAILURE) + { + watch_lb_policy (w->chand, w->lb_policy, w->state, cl); + } } -static void on_lb_policy_state_changed(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { +static void +on_lb_policy_state_changed (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ lb_policy_connectivity_watcher *w = arg; - gpr_mu_lock(&w->chand->mu_config); - on_lb_policy_state_changed_locked(w, closure_list); - gpr_mu_unlock(&w->chand->mu_config); + gpr_mu_lock (&w->chand->mu_config); + on_lb_policy_state_changed_locked (w, closure_list); + gpr_mu_unlock (&w->chand->mu_config); - GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy", - closure_list); - gpr_free(w); + GRPC_CHANNEL_INTERNAL_UNREF (w->chand->master, "watch_lb_policy", closure_list); + gpr_free (w); } -static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, - grpc_connectivity_state current_state, - grpc_closure_list *closure_list) { - lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); - GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); +static void +watch_lb_policy (channel_data * chand, grpc_lb_policy * lb_policy, grpc_connectivity_state current_state, grpc_closure_list * closure_list) +{ + lb_policy_connectivity_watcher *w = gpr_malloc (sizeof (*w)); + GRPC_CHANNEL_INTERNAL_REF (chand->master, "watch_lb_policy"); w->chand = chand; - grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w); + grpc_closure_init (&w->on_changed, on_lb_policy_state_changed, w); w->state = current_state; w->lb_policy = lb_policy; - grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed, - closure_list); + grpc_lb_policy_notify_on_state_change (lb_policy, &w->state, &w->on_changed, closure_list); } -static void cc_on_config_changed(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { +static void +cc_on_config_changed (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; @@ -491,298 +534,313 @@ static void cc_on_config_changed(void *arg, int iomgr_success, grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; int exit_idle = 0; - if (chand->incoming_configuration != NULL) { - lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); - if (lb_policy != NULL) { - GRPC_LB_POLICY_REF(lb_policy, "channel"); - GRPC_LB_POLICY_REF(lb_policy, "config_change"); - state = grpc_lb_policy_check_connectivity(lb_policy, closure_list); + if (chand->incoming_configuration != NULL) + { + lb_policy = grpc_client_config_get_lb_policy (chand->incoming_configuration); + if (lb_policy != NULL) + { + GRPC_LB_POLICY_REF (lb_policy, "channel"); + GRPC_LB_POLICY_REF (lb_policy, "config_change"); + state = grpc_lb_policy_check_connectivity (lb_policy, closure_list); + } + + grpc_client_config_unref (chand->incoming_configuration, closure_list); } - grpc_client_config_unref(chand->incoming_configuration, closure_list); - } - chand->incoming_configuration = NULL; - gpr_mu_lock(&chand->mu_config); + gpr_mu_lock (&chand->mu_config); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; - if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { - grpc_closure_list_move(&chand->waiting_for_config_closures, closure_list); - } - if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { - GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); - exit_idle = 1; - chand->exit_idle_when_lb_policy_arrives = 0; - } - - if (iomgr_success && chand->resolver) { - grpc_resolver *resolver = chand->resolver; - GRPC_RESOLVER_REF(resolver, "channel-next"); - grpc_connectivity_state_set(&chand->state_tracker, state, "new_lb+resolver", - closure_list); - if (lb_policy != NULL) { - watch_lb_policy(chand, lb_policy, state, closure_list); - } - gpr_mu_unlock(&chand->mu_config); - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); - grpc_resolver_next(resolver, &chand->incoming_configuration, - &chand->on_config_changed, closure_list); - GRPC_RESOLVER_UNREF(resolver, "channel-next", closure_list); - } else { - old_resolver = chand->resolver; - chand->resolver = NULL; - grpc_connectivity_state_set(&chand->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone", - closure_list); - gpr_mu_unlock(&chand->mu_config); - if (old_resolver != NULL) { - grpc_resolver_shutdown(old_resolver, closure_list); - GRPC_RESOLVER_UNREF(old_resolver, "channel", closure_list); - } - } - - if (exit_idle) { - grpc_lb_policy_exit_idle(lb_policy, closure_list); - GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle", closure_list); - } - - if (old_lb_policy != NULL) { - grpc_lb_policy_shutdown(old_lb_policy, closure_list); - GRPC_LB_POLICY_UNREF(old_lb_policy, "channel", closure_list); - } - - if (lb_policy != NULL) { - GRPC_LB_POLICY_UNREF(lb_policy, "config_change", closure_list); - } - - GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver", closure_list); + if (lb_policy != NULL || chand->resolver == NULL /* disconnected */ ) + { + grpc_closure_list_move (&chand->waiting_for_config_closures, closure_list); + } + if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) + { + GRPC_LB_POLICY_REF (lb_policy, "exit_idle"); + exit_idle = 1; + chand->exit_idle_when_lb_policy_arrives = 0; + } + + if (iomgr_success && chand->resolver) + { + grpc_resolver *resolver = chand->resolver; + GRPC_RESOLVER_REF (resolver, "channel-next"); + grpc_connectivity_state_set (&chand->state_tracker, state, "new_lb+resolver", closure_list); + if (lb_policy != NULL) + { + watch_lb_policy (chand, lb_policy, state, closure_list); + } + gpr_mu_unlock (&chand->mu_config); + GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); + grpc_resolver_next (resolver, &chand->incoming_configuration, &chand->on_config_changed, closure_list); + GRPC_RESOLVER_UNREF (resolver, "channel-next", closure_list); + } + else + { + old_resolver = chand->resolver; + chand->resolver = NULL; + grpc_connectivity_state_set (&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone", closure_list); + gpr_mu_unlock (&chand->mu_config); + if (old_resolver != NULL) + { + grpc_resolver_shutdown (old_resolver, closure_list); + GRPC_RESOLVER_UNREF (old_resolver, "channel", closure_list); + } + } + + if (exit_idle) + { + grpc_lb_policy_exit_idle (lb_policy, closure_list); + GRPC_LB_POLICY_UNREF (lb_policy, "exit_idle", closure_list); + } + + if (old_lb_policy != NULL) + { + grpc_lb_policy_shutdown (old_lb_policy, closure_list); + GRPC_LB_POLICY_UNREF (old_lb_policy, "channel", closure_list); + } + + if (lb_policy != NULL) + { + GRPC_LB_POLICY_UNREF (lb_policy, "config_change", closure_list); + } + + GRPC_CHANNEL_INTERNAL_UNREF (chand->master, "resolver", closure_list); } -static void cc_start_transport_op(grpc_channel_element *elem, - grpc_transport_op *op, - grpc_closure_list *closure_list) { +static void +cc_start_transport_op (grpc_channel_element * elem, grpc_transport_op * op, grpc_closure_list * closure_list) +{ grpc_lb_policy *lb_policy = NULL; channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; - grpc_closure_list_add(closure_list, op->on_consumed, 1); - - GPR_ASSERT(op->set_accept_stream == NULL); - GPR_ASSERT(op->bind_pollset == NULL); - - gpr_mu_lock(&chand->mu_config); - if (op->on_connectivity_state_change != NULL) { - grpc_connectivity_state_notify_on_state_change( - &chand->state_tracker, op->connectivity_state, - op->on_connectivity_state_change, closure_list); - op->on_connectivity_state_change = NULL; - op->connectivity_state = NULL; - } - - if (!is_empty(op, sizeof(*op))) { - lb_policy = chand->lb_policy; - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "broadcast"); - } - } - - if (op->disconnect && chand->resolver != NULL) { - grpc_connectivity_state_set(&chand->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "disconnect", - closure_list); - destroy_resolver = chand->resolver; - chand->resolver = NULL; - if (chand->lb_policy != NULL) { - grpc_lb_policy_shutdown(chand->lb_policy, closure_list); - GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel", closure_list); - chand->lb_policy = NULL; - } - } - gpr_mu_unlock(&chand->mu_config); - - if (destroy_resolver) { - grpc_resolver_shutdown(destroy_resolver, closure_list); - GRPC_RESOLVER_UNREF(destroy_resolver, "channel", closure_list); - } - - if (lb_policy) { - grpc_lb_policy_broadcast(lb_policy, op, closure_list); - GRPC_LB_POLICY_UNREF(lb_policy, "broadcast", closure_list); - } + grpc_closure_list_add (closure_list, op->on_consumed, 1); + + GPR_ASSERT (op->set_accept_stream == NULL); + GPR_ASSERT (op->bind_pollset == NULL); + + gpr_mu_lock (&chand->mu_config); + if (op->on_connectivity_state_change != NULL) + { + grpc_connectivity_state_notify_on_state_change (&chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change, closure_list); + op->on_connectivity_state_change = NULL; + op->connectivity_state = NULL; + } + + if (!is_empty (op, sizeof (*op))) + { + lb_policy = chand->lb_policy; + if (lb_policy) + { + GRPC_LB_POLICY_REF (lb_policy, "broadcast"); + } + } + + if (op->disconnect && chand->resolver != NULL) + { + grpc_connectivity_state_set (&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect", closure_list); + destroy_resolver = chand->resolver; + chand->resolver = NULL; + if (chand->lb_policy != NULL) + { + grpc_lb_policy_shutdown (chand->lb_policy, closure_list); + GRPC_LB_POLICY_UNREF (chand->lb_policy, "channel", closure_list); + chand->lb_policy = NULL; + } + } + gpr_mu_unlock (&chand->mu_config); + + if (destroy_resolver) + { + grpc_resolver_shutdown (destroy_resolver, closure_list); + GRPC_RESOLVER_UNREF (destroy_resolver, "channel", closure_list); + } + + if (lb_policy) + { + grpc_lb_policy_broadcast (lb_policy, op, closure_list); + GRPC_LB_POLICY_UNREF (lb_policy, "broadcast", closure_list); + } } /* Constructor for call_data */ -static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op, - grpc_closure_list *closure_list) { +static void +init_call_elem (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list) +{ call_data *calld = elem->call_data; /* TODO(ctiller): is there something useful we can do here? */ - GPR_ASSERT(initial_op == NULL); + GPR_ASSERT (initial_op == NULL); - GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - GPR_ASSERT(server_transport_data == NULL); - gpr_mu_init(&calld->mu_state); + GPR_ASSERT (elem->filter == &grpc_client_channel_filter); + GPR_ASSERT (server_transport_data == NULL); + gpr_mu_init (&calld->mu_state); calld->elem = elem; calld->state = CALL_CREATED; - calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + calld->deadline = gpr_inf_future (GPR_CLOCK_REALTIME); } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem, - grpc_closure_list *closure_list) { +static void +destroy_call_elem (grpc_call_element * elem, grpc_closure_list * closure_list) +{ call_data *calld = elem->call_data; grpc_subchannel_call *subchannel_call; /* if the call got activated, we need to destroy the child stack also, and remove it from the in-flight requests tracked by the child_entry we picked */ - gpr_mu_lock(&calld->mu_state); - switch (calld->state) { + gpr_mu_lock (&calld->mu_state); + switch (calld->state) + { case CALL_ACTIVE: subchannel_call = calld->subchannel_call; - gpr_mu_unlock(&calld->mu_state); - GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel", - closure_list); + gpr_mu_unlock (&calld->mu_state); + GRPC_SUBCHANNEL_CALL_UNREF (subchannel_call, "client_channel", closure_list); break; case CALL_CREATED: case CALL_CANCELLED: - gpr_mu_unlock(&calld->mu_state); + gpr_mu_unlock (&calld->mu_state); break; case CALL_WAITING_FOR_PICK: case CALL_WAITING_FOR_CONFIG: case CALL_WAITING_FOR_CALL: case CALL_WAITING_FOR_SEND: - gpr_log(GPR_ERROR, "should never reach here"); - abort(); + gpr_log (GPR_ERROR, "should never reach here"); + abort (); break; - } + } } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, - grpc_mdctx *metadata_context, int is_first, - int is_last, grpc_closure_list *closure_list) { +static void +init_channel_elem (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, int is_first, int is_last, grpc_closure_list * closure_list) +{ channel_data *chand = elem->channel_data; - memset(chand, 0, sizeof(*chand)); + memset (chand, 0, sizeof (*chand)); - GPR_ASSERT(is_last); - GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + GPR_ASSERT (is_last); + GPR_ASSERT (elem->filter == &grpc_client_channel_filter); - gpr_mu_init(&chand->mu_config); + gpr_mu_init (&chand->mu_config); chand->mdctx = metadata_context; chand->master = master; - grpc_pollset_set_init(&chand->pollset_set); - grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); + grpc_pollset_set_init (&chand->pollset_set); + grpc_closure_init (&chand->on_config_changed, cc_on_config_changed, chand); - grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, - "client_channel"); + grpc_connectivity_state_init (&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); } /* Destructor for channel_data */ -static void destroy_channel_elem(grpc_channel_element *elem, - grpc_closure_list *closure_list) { +static void +destroy_channel_elem (grpc_channel_element * elem, grpc_closure_list * closure_list) +{ channel_data *chand = elem->channel_data; - if (chand->resolver != NULL) { - grpc_resolver_shutdown(chand->resolver, closure_list); - GRPC_RESOLVER_UNREF(chand->resolver, "channel", closure_list); - } - if (chand->lb_policy != NULL) { - GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel", closure_list); - } - grpc_connectivity_state_destroy(&chand->state_tracker, closure_list); - grpc_pollset_set_destroy(&chand->pollset_set); - gpr_mu_destroy(&chand->mu_config); + if (chand->resolver != NULL) + { + grpc_resolver_shutdown (chand->resolver, closure_list); + GRPC_RESOLVER_UNREF (chand->resolver, "channel", closure_list); + } + if (chand->lb_policy != NULL) + { + GRPC_LB_POLICY_UNREF (chand->lb_policy, "channel", closure_list); + } + grpc_connectivity_state_destroy (&chand->state_tracker, closure_list); + grpc_pollset_set_destroy (&chand->pollset_set); + gpr_mu_destroy (&chand->mu_config); } const grpc_channel_filter grpc_client_channel_filter = { - cc_start_transport_stream_op, - cc_start_transport_op, - sizeof(call_data), - init_call_elem, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - cc_get_peer, - "client-channel", + cc_start_transport_stream_op, + cc_start_transport_op, + sizeof (call_data), + init_call_elem, + destroy_call_elem, + sizeof (channel_data), + init_channel_elem, + destroy_channel_elem, + cc_get_peer, + "client-channel", }; -void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, - grpc_resolver *resolver, - grpc_closure_list *closure_list) { +void +grpc_client_channel_set_resolver (grpc_channel_stack * channel_stack, grpc_resolver * resolver, grpc_closure_list * closure_list) +{ /* post construction initialization: set the transport setup pointer */ - grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); + grpc_channel_element *elem = grpc_channel_stack_last_element (channel_stack); channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->mu_config); - GPR_ASSERT(!chand->resolver); + gpr_mu_lock (&chand->mu_config); + GPR_ASSERT (!chand->resolver); chand->resolver = resolver; - GRPC_RESOLVER_REF(resolver, "channel"); - if (!grpc_closure_list_empty(chand->waiting_for_config_closures) || - chand->exit_idle_when_lb_policy_arrives) { - chand->started_resolving = 1; - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); - grpc_resolver_next(resolver, &chand->incoming_configuration, - &chand->on_config_changed, closure_list); - } - gpr_mu_unlock(&chand->mu_config); + GRPC_RESOLVER_REF (resolver, "channel"); + if (!grpc_closure_list_empty (chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives) + { + chand->started_resolving = 1; + GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); + grpc_resolver_next (resolver, &chand->incoming_configuration, &chand->on_config_changed, closure_list); + } + gpr_mu_unlock (&chand->mu_config); } -grpc_connectivity_state grpc_client_channel_check_connectivity_state( - grpc_channel_element *elem, int try_to_connect, - grpc_closure_list *closure_list) { +grpc_connectivity_state +grpc_client_channel_check_connectivity_state (grpc_channel_element * elem, int try_to_connect, grpc_closure_list * closure_list) +{ channel_data *chand = elem->channel_data; grpc_connectivity_state out; - gpr_mu_lock(&chand->mu_config); - out = grpc_connectivity_state_check(&chand->state_tracker); - if (out == GRPC_CHANNEL_IDLE && try_to_connect) { - if (chand->lb_policy != NULL) { - grpc_lb_policy_exit_idle(chand->lb_policy, closure_list); - } else { - chand->exit_idle_when_lb_policy_arrives = 1; - if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); - chand->started_resolving = 1; - grpc_resolver_next(chand->resolver, &chand->incoming_configuration, - &chand->on_config_changed, closure_list); - } - } - } - gpr_mu_unlock(&chand->mu_config); + gpr_mu_lock (&chand->mu_config); + out = grpc_connectivity_state_check (&chand->state_tracker); + if (out == GRPC_CHANNEL_IDLE && try_to_connect) + { + if (chand->lb_policy != NULL) + { + grpc_lb_policy_exit_idle (chand->lb_policy, closure_list); + } + else + { + chand->exit_idle_when_lb_policy_arrives = 1; + if (!chand->started_resolving && chand->resolver != NULL) + { + GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver"); + chand->started_resolving = 1; + grpc_resolver_next (chand->resolver, &chand->incoming_configuration, &chand->on_config_changed, closure_list); + } + } + } + gpr_mu_unlock (&chand->mu_config); return out; } -void grpc_client_channel_watch_connectivity_state( - grpc_channel_element *elem, grpc_connectivity_state *state, - grpc_closure *on_complete, grpc_closure_list *closure_list) { +void +grpc_client_channel_watch_connectivity_state (grpc_channel_element * elem, grpc_connectivity_state * state, grpc_closure * on_complete, grpc_closure_list * closure_list) +{ channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->mu_config); - grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state, - on_complete, closure_list); - gpr_mu_unlock(&chand->mu_config); + gpr_mu_lock (&chand->mu_config); + grpc_connectivity_state_notify_on_state_change (&chand->state_tracker, state, on_complete, closure_list); + gpr_mu_unlock (&chand->mu_config); } -grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( - grpc_channel_element *elem) { +grpc_pollset_set * +grpc_client_channel_get_connecting_pollset_set (grpc_channel_element * elem) +{ channel_data *chand = elem->channel_data; return &chand->pollset_set; } -void grpc_client_channel_add_interested_party(grpc_channel_element *elem, - grpc_pollset *pollset, - grpc_closure_list *closure_list) { +void +grpc_client_channel_add_interested_party (grpc_channel_element * elem, grpc_pollset * pollset, grpc_closure_list * closure_list) +{ channel_data *chand = elem->channel_data; - grpc_pollset_set_add_pollset(&chand->pollset_set, pollset, closure_list); + grpc_pollset_set_add_pollset (&chand->pollset_set, pollset, closure_list); } -void grpc_client_channel_del_interested_party(grpc_channel_element *elem, - grpc_pollset *pollset, - grpc_closure_list *closure_list) { +void +grpc_client_channel_del_interested_party (grpc_channel_element * elem, grpc_pollset * pollset, grpc_closure_list * closure_list) +{ channel_data *chand = elem->channel_data; - grpc_pollset_set_del_pollset(&chand->pollset_set, pollset, closure_list); + grpc_pollset_set_del_pollset (&chand->pollset_set, pollset, closure_list); } diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index d200f6e4b0..468cf5ef3e 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -49,26 +49,15 @@ extern const grpc_channel_filter grpc_client_channel_filter; /* post-construction initializer to let the client channel know which transport setup it should cancel upon destruction, or initiate when it needs a connection */ -void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, - grpc_resolver *resolver, - grpc_closure_list *closure_list); +void grpc_client_channel_set_resolver (grpc_channel_stack * channel_stack, grpc_resolver * resolver, grpc_closure_list * closure_list); -grpc_connectivity_state grpc_client_channel_check_connectivity_state( - grpc_channel_element *elem, int try_to_connect, - grpc_closure_list *closure_list); +grpc_connectivity_state grpc_client_channel_check_connectivity_state (grpc_channel_element * elem, int try_to_connect, grpc_closure_list * closure_list); -void grpc_client_channel_watch_connectivity_state( - grpc_channel_element *elem, grpc_connectivity_state *state, - grpc_closure *on_complete, grpc_closure_list *closure_list); +void grpc_client_channel_watch_connectivity_state (grpc_channel_element * elem, grpc_connectivity_state * state, grpc_closure * on_complete, grpc_closure_list * closure_list); -grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( - grpc_channel_element *elem); +grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set (grpc_channel_element * elem); -void grpc_client_channel_add_interested_party(grpc_channel_element *channel, - grpc_pollset *pollset, - grpc_closure_list *closure_list); -void grpc_client_channel_del_interested_party(grpc_channel_element *channel, - grpc_pollset *pollset, - grpc_closure_list *closure_list); +void grpc_client_channel_add_interested_party (grpc_channel_element * channel, grpc_pollset * pollset, grpc_closure_list * closure_list); +void grpc_client_channel_del_interested_party (grpc_channel_element * channel, grpc_pollset * pollset, grpc_closure_list * closure_list); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 911c689d54..a1c03dc9d9 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -44,13 +44,14 @@ #include "src/core/compression/message_compress.h" #include "src/core/support/string.h" -typedef struct call_data { +typedef struct call_data +{ gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; - gpr_uint32 - remaining_slice_bytes; /**< Input data to be read, as per BEGIN_MESSAGE */ - int written_initial_metadata; /**< Already processed initial md? */ + gpr_uint32 remaining_slice_bytes; + /**< Input data to be read, as per BEGIN_MESSAGE */ + int written_initial_metadata; /**< Already processed initial md? */ /** Compression algorithm we'll try to use. It may be given by incoming * metadata, or by the channel's default compression settings. */ grpc_compression_algorithm compression_algorithm; @@ -58,7 +59,8 @@ typedef struct call_data { int has_compression_algorithm; } call_data; -typedef struct channel_data { +typedef struct channel_data +{ /** Metadata key for the incoming (requested) compression algorithm */ grpc_mdstr *mdstr_request_compression_algorithm_key; /** Metadata key for the outgoing (used) compression algorithm */ @@ -80,59 +82,62 @@ typedef struct channel_data { * larger than the raw input). * * Returns 1 if the data was actually compress and 0 otherwise. */ -static int compress_send_sb(grpc_compression_algorithm algorithm, - gpr_slice_buffer *slices) { +static int +compress_send_sb (grpc_compression_algorithm algorithm, gpr_slice_buffer * slices) +{ int did_compress; gpr_slice_buffer tmp; - gpr_slice_buffer_init(&tmp); - did_compress = grpc_msg_compress(algorithm, slices, &tmp); - if (did_compress) { - gpr_slice_buffer_swap(slices, &tmp); - } - gpr_slice_buffer_destroy(&tmp); + gpr_slice_buffer_init (&tmp); + did_compress = grpc_msg_compress (algorithm, slices, &tmp); + if (did_compress) + { + gpr_slice_buffer_swap (slices, &tmp); + } + gpr_slice_buffer_destroy (&tmp); return did_compress; } /** For each \a md element from the incoming metadata, filter out the entry for * "grpc-encoding", using its value to populate the call data's * compression_algorithm field. */ -static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { +static grpc_mdelem * +compression_md_filter (void *user_data, grpc_mdelem * md) +{ grpc_call_element *elem = user_data; call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; - if (md->key == channeld->mdstr_request_compression_algorithm_key) { - const char *md_c_str = grpc_mdstr_as_c_string(md->value); - if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), - &calld->compression_algorithm)) { - gpr_log(GPR_ERROR, - "Invalid compression algorithm: '%s' (unknown). Ignoring.", - md_c_str); - calld->compression_algorithm = GRPC_COMPRESS_NONE; - } - if (grpc_compression_options_is_algorithm_enabled( - &channeld->compression_options, calld->compression_algorithm) == - 0) { - gpr_log(GPR_ERROR, - "Invalid compression algorithm: '%s' (previously disabled). " - "Ignoring.", - md_c_str); - calld->compression_algorithm = GRPC_COMPRESS_NONE; + if (md->key == channeld->mdstr_request_compression_algorithm_key) + { + const char *md_c_str = grpc_mdstr_as_c_string (md->value); + if (!grpc_compression_algorithm_parse (md_c_str, strlen (md_c_str), &calld->compression_algorithm)) + { + gpr_log (GPR_ERROR, "Invalid compression algorithm: '%s' (unknown). Ignoring.", md_c_str); + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } + if (grpc_compression_options_is_algorithm_enabled (&channeld->compression_options, calld->compression_algorithm) == 0) + { + gpr_log (GPR_ERROR, "Invalid compression algorithm: '%s' (previously disabled). " "Ignoring.", md_c_str); + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } + calld->has_compression_algorithm = 1; + return NULL; } - calld->has_compression_algorithm = 1; - return NULL; - } return md; } -static int skip_compression(channel_data *channeld, call_data *calld) { - if (calld->has_compression_algorithm) { - if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { - return 1; +static int +skip_compression (channel_data * channeld, call_data * calld) +{ + if (calld->has_compression_algorithm) + { + if (calld->compression_algorithm == GRPC_COMPRESS_NONE) + { + return 1; + } + return 0; /* we have an actual call-specific algorithm */ } - return 0; /* we have an actual call-specific algorithm */ - } /* no per-call compression override */ return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; } @@ -141,126 +146,127 @@ static int skip_compression(channel_data *channeld, call_data *calld) { * the associated GRPC_OP_BEGIN_MESSAGE accordingly (new compressed length, * flags indicating compression is in effect) and replaces \a send_ops with it. * */ -static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops, - grpc_call_element *elem) { +static void +finish_compressed_sopb (grpc_stream_op_buffer * send_ops, grpc_call_element * elem) +{ size_t i; call_data *calld = elem->call_data; - int new_slices_added = 0; /* GPR_FALSE */ + int new_slices_added = 0; /* GPR_FALSE */ grpc_metadata_batch metadata; grpc_stream_op_buffer new_send_ops; - grpc_sopb_init(&new_send_ops); - - for (i = 0; i < send_ops->nops; i++) { - grpc_stream_op *sop = &send_ops->ops[i]; - switch (sop->type) { - case GRPC_OP_BEGIN_MESSAGE: - GPR_ASSERT(calld->slices.length <= GPR_UINT32_MAX); - grpc_sopb_add_begin_message( - &new_send_ops, (gpr_uint32)calld->slices.length, - sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS); - break; - case GRPC_OP_SLICE: - /* Once we reach the slices section of the original buffer, simply add - * all the new (compressed) slices. We obviously want to do this only - * once, hence the "new_slices_added" guard. */ - if (!new_slices_added) { - size_t j; - for (j = 0; j < calld->slices.count; ++j) { - grpc_sopb_add_slice(&new_send_ops, - gpr_slice_ref(calld->slices.slices[j])); - } - new_slices_added = 1; /* GPR_TRUE */ - } - break; - case GRPC_OP_METADATA: - /* move the metadata to the new buffer. */ - grpc_metadata_batch_move(&metadata, &sop->data.metadata); - grpc_sopb_add_metadata(&new_send_ops, metadata); - break; - case GRPC_NO_OP: - break; + grpc_sopb_init (&new_send_ops); + + for (i = 0; i < send_ops->nops; i++) + { + grpc_stream_op *sop = &send_ops->ops[i]; + switch (sop->type) + { + case GRPC_OP_BEGIN_MESSAGE: + GPR_ASSERT (calld->slices.length <= GPR_UINT32_MAX); + grpc_sopb_add_begin_message (&new_send_ops, (gpr_uint32) calld->slices.length, sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS); + break; + case GRPC_OP_SLICE: + /* Once we reach the slices section of the original buffer, simply add + * all the new (compressed) slices. We obviously want to do this only + * once, hence the "new_slices_added" guard. */ + if (!new_slices_added) + { + size_t j; + for (j = 0; j < calld->slices.count; ++j) + { + grpc_sopb_add_slice (&new_send_ops, gpr_slice_ref (calld->slices.slices[j])); + } + new_slices_added = 1; /* GPR_TRUE */ + } + break; + case GRPC_OP_METADATA: + /* move the metadata to the new buffer. */ + grpc_metadata_batch_move (&metadata, &sop->data.metadata); + grpc_sopb_add_metadata (&new_send_ops, metadata); + break; + case GRPC_NO_OP: + break; + } } - } - grpc_sopb_swap(send_ops, &new_send_ops); - grpc_sopb_destroy(&new_send_ops); + grpc_sopb_swap (send_ops, &new_send_ops); + grpc_sopb_destroy (&new_send_ops); } /** Filter's "main" function, called for any incoming grpc_transport_stream_op * instance that holds a non-zero number of send operations, accesible to this * function in \a send_ops. */ -static void process_send_ops(grpc_call_element *elem, - grpc_stream_op_buffer *send_ops) { +static void +process_send_ops (grpc_call_element * elem, grpc_stream_op_buffer * send_ops) +{ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; size_t i; int did_compress = 0; /* In streaming calls, we need to reset the previously accumulated slices */ - gpr_slice_buffer_reset_and_unref(&calld->slices); - for (i = 0; i < send_ops->nops; ++i) { - grpc_stream_op *sop = &send_ops->ops[i]; - switch (sop->type) { - case GRPC_OP_BEGIN_MESSAGE: - /* buffer up slices until we've processed all the expected ones (as - * given by GRPC_OP_BEGIN_MESSAGE) */ - calld->remaining_slice_bytes = sop->data.begin_message.length; - if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) { - calld->has_compression_algorithm = 1; /* GPR_TRUE */ - calld->compression_algorithm = GRPC_COMPRESS_NONE; - } - break; - case GRPC_OP_METADATA: - if (!calld->written_initial_metadata) { - /* Parse incoming request for compression. If any, it'll be available - * at calld->compression_algorithm */ - grpc_metadata_batch_filter(&(sop->data.metadata), - compression_md_filter, elem); - if (!calld->has_compression_algorithm) { - /* If no algorithm was found in the metadata and we aren't - * exceptionally skipping compression, fall back to the channel - * default */ - calld->compression_algorithm = - channeld->default_compression_algorithm; - calld->has_compression_algorithm = 1; /* GPR_TRUE */ - } - /* hint compression algorithm */ - grpc_metadata_batch_add_tail( - &(sop->data.metadata), &calld->compression_algorithm_storage, - GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms - [calld->compression_algorithm])); - - /* convey supported compression algorithms */ - grpc_metadata_batch_add_tail( - &(sop->data.metadata), &calld->accept_encoding_storage, - GRPC_MDELEM_REF(channeld->mdelem_accept_encoding)); - - calld->written_initial_metadata = 1; /* GPR_TRUE */ - } - break; - case GRPC_OP_SLICE: - if (skip_compression(channeld, calld)) continue; - GPR_ASSERT(calld->remaining_slice_bytes > 0); - /* Increase input ref count, gpr_slice_buffer_add takes ownership. */ - gpr_slice_buffer_add(&calld->slices, gpr_slice_ref(sop->data.slice)); - GPR_ASSERT(GPR_SLICE_LENGTH(sop->data.slice) >= - calld->remaining_slice_bytes); - calld->remaining_slice_bytes -= - (gpr_uint32)GPR_SLICE_LENGTH(sop->data.slice); - if (calld->remaining_slice_bytes == 0) { - did_compress = - compress_send_sb(calld->compression_algorithm, &calld->slices); - } - break; - case GRPC_NO_OP: - break; + gpr_slice_buffer_reset_and_unref (&calld->slices); + for (i = 0; i < send_ops->nops; ++i) + { + grpc_stream_op *sop = &send_ops->ops[i]; + switch (sop->type) + { + case GRPC_OP_BEGIN_MESSAGE: + /* buffer up slices until we've processed all the expected ones (as + * given by GRPC_OP_BEGIN_MESSAGE) */ + calld->remaining_slice_bytes = sop->data.begin_message.length; + if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) + { + calld->has_compression_algorithm = 1; /* GPR_TRUE */ + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } + break; + case GRPC_OP_METADATA: + if (!calld->written_initial_metadata) + { + /* Parse incoming request for compression. If any, it'll be available + * at calld->compression_algorithm */ + grpc_metadata_batch_filter (&(sop->data.metadata), compression_md_filter, elem); + if (!calld->has_compression_algorithm) + { + /* If no algorithm was found in the metadata and we aren't + * exceptionally skipping compression, fall back to the channel + * default */ + calld->compression_algorithm = channeld->default_compression_algorithm; + calld->has_compression_algorithm = 1; /* GPR_TRUE */ + } + /* hint compression algorithm */ + grpc_metadata_batch_add_tail (&(sop->data.metadata), &calld->compression_algorithm_storage, GRPC_MDELEM_REF (channeld->mdelem_compression_algorithms[calld->compression_algorithm])); + + /* convey supported compression algorithms */ + grpc_metadata_batch_add_tail (&(sop->data.metadata), &calld->accept_encoding_storage, GRPC_MDELEM_REF (channeld->mdelem_accept_encoding)); + + calld->written_initial_metadata = 1; /* GPR_TRUE */ + } + break; + case GRPC_OP_SLICE: + if (skip_compression (channeld, calld)) + continue; + GPR_ASSERT (calld->remaining_slice_bytes > 0); + /* Increase input ref count, gpr_slice_buffer_add takes ownership. */ + gpr_slice_buffer_add (&calld->slices, gpr_slice_ref (sop->data.slice)); + GPR_ASSERT (GPR_SLICE_LENGTH (sop->data.slice) >= calld->remaining_slice_bytes); + calld->remaining_slice_bytes -= (gpr_uint32) GPR_SLICE_LENGTH (sop->data.slice); + if (calld->remaining_slice_bytes == 0) + { + did_compress = compress_send_sb (calld->compression_algorithm, &calld->slices); + } + break; + case GRPC_NO_OP: + break; + } } - } /* Modify the send_ops stream_op_buffer depending on whether compression was * carried out */ - if (did_compress) { - finish_compressed_sopb(send_ops, elem); - } + if (did_compress) + { + finish_compressed_sopb (send_ops, elem); + } } /* Called either: @@ -268,50 +274,52 @@ static void process_send_ops(grpc_call_element *elem, - a network event (or similar) from below, to receive something op contains type and call direction information, in addition to the data that is being sent or received. */ -static void compress_start_transport_stream_op( - grpc_call_element *elem, grpc_transport_stream_op *op, - grpc_closure_list *closure_list) { - if (op->send_ops && op->send_ops->nops > 0) { - process_send_ops(elem, op->send_ops); - } +static void +compress_start_transport_stream_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list) +{ + if (op->send_ops && op->send_ops->nops > 0) + { + process_send_ops (elem, op->send_ops); + } /* pass control down the stack */ - grpc_call_next_op(elem, op, closure_list); + grpc_call_next_op (elem, op, closure_list); } /* Constructor for call_data */ -static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op, - grpc_closure_list *closure_list) { +static void +init_call_elem (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list) +{ /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ - gpr_slice_buffer_init(&calld->slices); + gpr_slice_buffer_init (&calld->slices); calld->has_compression_algorithm = 0; - calld->written_initial_metadata = 0; /* GPR_FALSE */ - - if (initial_op) { - if (initial_op->send_ops && initial_op->send_ops->nops > 0) { - process_send_ops(elem, initial_op->send_ops); + calld->written_initial_metadata = 0; /* GPR_FALSE */ + + if (initial_op) + { + if (initial_op->send_ops && initial_op->send_ops->nops > 0) + { + process_send_ops (elem, initial_op->send_ops); + } } - } } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem, - grpc_closure_list *closure_list) { +static void +destroy_call_elem (grpc_call_element * elem, grpc_closure_list * closure_list) +{ /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; - gpr_slice_buffer_destroy(&calld->slices); + gpr_slice_buffer_destroy (&calld->slices); } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last, - grpc_closure_list *closure_list) { +static void +init_channel_elem (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last, grpc_closure_list * closure_list) +{ channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1]; @@ -319,82 +327,72 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, char *accept_encoding_str; size_t accept_encoding_str_len; - grpc_compression_options_init(&channeld->compression_options); - channeld->compression_options.enabled_algorithms_bitset = - (gpr_uint32)grpc_channel_args_compression_algorithm_get_states(args); + grpc_compression_options_init (&channeld->compression_options); + channeld->compression_options.enabled_algorithms_bitset = (gpr_uint32) grpc_channel_args_compression_algorithm_get_states (args); - channeld->default_compression_algorithm = - grpc_channel_args_get_compression_algorithm(args); + channeld->default_compression_algorithm = grpc_channel_args_get_compression_algorithm (args); /* Make sure the default isn't disabled. */ - GPR_ASSERT(grpc_compression_options_is_algorithm_enabled( - &channeld->compression_options, channeld->default_compression_algorithm)); - channeld->compression_options.default_compression_algorithm = - channeld->default_compression_algorithm; - - channeld->mdstr_request_compression_algorithm_key = - grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0); - - channeld->mdstr_outgoing_compression_algorithm_key = - grpc_mdstr_from_string(mdctx, "grpc-encoding", 0); - - channeld->mdstr_compression_capabilities_key = - grpc_mdstr_from_string(mdctx, "grpc-accept-encoding", 0); - - for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { - char *algorithm_name; - /* skip disabled algorithms */ - if (grpc_compression_options_is_algorithm_enabled( - &channeld->compression_options, algo_idx) == 0) { - continue; - } - GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0); - channeld->mdelem_compression_algorithms[algo_idx] = - grpc_mdelem_from_metadata_strings( - mdctx, - GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), - grpc_mdstr_from_string(mdctx, algorithm_name, 0)); - if (algo_idx > 0) { - supported_algorithms_names[supported_algorithms_idx++] = algorithm_name; + GPR_ASSERT (grpc_compression_options_is_algorithm_enabled (&channeld->compression_options, channeld->default_compression_algorithm)); + channeld->compression_options.default_compression_algorithm = channeld->default_compression_algorithm; + + channeld->mdstr_request_compression_algorithm_key = grpc_mdstr_from_string (mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0); + + channeld->mdstr_outgoing_compression_algorithm_key = grpc_mdstr_from_string (mdctx, "grpc-encoding", 0); + + channeld->mdstr_compression_capabilities_key = grpc_mdstr_from_string (mdctx, "grpc-accept-encoding", 0); + + for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) + { + char *algorithm_name; + /* skip disabled algorithms */ + if (grpc_compression_options_is_algorithm_enabled (&channeld->compression_options, algo_idx) == 0) + { + continue; + } + GPR_ASSERT (grpc_compression_algorithm_name (algo_idx, &algorithm_name) != 0); + channeld->mdelem_compression_algorithms[algo_idx] = grpc_mdelem_from_metadata_strings (mdctx, GRPC_MDSTR_REF (channeld->mdstr_outgoing_compression_algorithm_key), grpc_mdstr_from_string (mdctx, algorithm_name, 0)); + if (algo_idx > 0) + { + supported_algorithms_names[supported_algorithms_idx++] = algorithm_name; + } } - } /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated * arrays, as to avoid the heap allocs */ - accept_encoding_str = - gpr_strjoin_sep(supported_algorithms_names, supported_algorithms_idx, ",", - &accept_encoding_str_len); + accept_encoding_str = gpr_strjoin_sep (supported_algorithms_names, supported_algorithms_idx, ",", &accept_encoding_str_len); - channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings( - mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), - grpc_mdstr_from_string(mdctx, accept_encoding_str, 0)); - gpr_free(accept_encoding_str); + channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings (mdctx, GRPC_MDSTR_REF (channeld->mdstr_compression_capabilities_key), grpc_mdstr_from_string (mdctx, accept_encoding_str, 0)); + gpr_free (accept_encoding_str); - GPR_ASSERT(!is_last); + GPR_ASSERT (!is_last); } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element *elem, - grpc_closure_list *closure_list) { +static void +destroy_channel_elem (grpc_channel_element * elem, grpc_closure_list * closure_list) +{ channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; - GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key); - GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key); - GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key); - for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { - GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]); - } - GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding); + GRPC_MDSTR_UNREF (channeld->mdstr_request_compression_algorithm_key); + GRPC_MDSTR_UNREF (channeld->mdstr_outgoing_compression_algorithm_key); + GRPC_MDSTR_UNREF (channeld->mdstr_compression_capabilities_key); + for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) + { + GRPC_MDELEM_UNREF (channeld->mdelem_compression_algorithms[algo_idx]); + } + GRPC_MDELEM_UNREF (channeld->mdelem_accept_encoding); } const grpc_channel_filter grpc_compress_filter = { - compress_start_transport_stream_op, - grpc_channel_next_op, - sizeof(call_data), - init_call_elem, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - grpc_call_next_get_peer, - "compress"}; + compress_start_transport_stream_op, + grpc_channel_next_op, + sizeof (call_data), + init_call_elem, + destroy_call_elem, + sizeof (channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + "compress" +}; diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index b58e180a43..7ba412fe5c 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -46,11 +46,15 @@ #define MAX_BUFFER_LENGTH 8192 -typedef struct connected_channel_channel_data { +typedef struct connected_channel_channel_data +{ grpc_transport *transport; } channel_data; -typedef struct connected_channel_call_data { void *unused; } call_data; +typedef struct connected_channel_call_data +{ + void *unused; +} call_data; /* We perform a small hack to locate transport data alongside the connected channel data in call allocations, to allow everything to be pulled in minimal @@ -61,98 +65,95 @@ typedef struct connected_channel_call_data { void *unused; } call_data; /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ -static void con_start_transport_stream_op(grpc_call_element *elem, - grpc_transport_stream_op *op, - grpc_closure_list *closure_list) { +static void +con_start_transport_stream_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list) +{ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); + GRPC_CALL_LOG_OP (GPR_INFO, elem, op); - grpc_transport_perform_stream_op(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), op, - closure_list); + grpc_transport_perform_stream_op (chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld), op, closure_list); } -static void con_start_transport_op(grpc_channel_element *elem, - grpc_transport_op *op, - grpc_closure_list *closure_list) { +static void +con_start_transport_op (grpc_channel_element * elem, grpc_transport_op * op, grpc_closure_list * closure_list) +{ channel_data *chand = elem->channel_data; - grpc_transport_perform_op(chand->transport, op, closure_list); + grpc_transport_perform_op (chand->transport, op, closure_list); } /* Constructor for call_data */ -static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op, - grpc_closure_list *closure_list) { +static void +init_call_elem (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list) +{ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; int r; - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - r = grpc_transport_init_stream( - chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - server_transport_data, initial_op, closure_list); - GPR_ASSERT(r == 0); + GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); + r = grpc_transport_init_stream (chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld), server_transport_data, initial_op, closure_list); + GPR_ASSERT (r == 0); } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem, - grpc_closure_list *closure_list) { +static void +destroy_call_elem (grpc_call_element * elem, grpc_closure_list * closure_list) +{ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - grpc_transport_destroy_stream( - chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), closure_list); + GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); + grpc_transport_destroy_stream (chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld), closure_list); } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last, - grpc_closure_list *closure_list) { - channel_data *cd = (channel_data *)elem->channel_data; - GPR_ASSERT(is_last); - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); +static void +init_channel_elem (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last, grpc_closure_list * closure_list) +{ + channel_data *cd = (channel_data *) elem->channel_data; + GPR_ASSERT (is_last); + GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); cd->transport = NULL; } /* Destructor for channel_data */ -static void destroy_channel_elem(grpc_channel_element *elem, - grpc_closure_list *closure_list) { - channel_data *cd = (channel_data *)elem->channel_data; - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - grpc_transport_destroy(cd->transport, closure_list); +static void +destroy_channel_elem (grpc_channel_element * elem, grpc_closure_list * closure_list) +{ + channel_data *cd = (channel_data *) elem->channel_data; + GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); + grpc_transport_destroy (cd->transport, closure_list); } -static char *con_get_peer(grpc_call_element *elem, - grpc_closure_list *closure_list) { +static char * +con_get_peer (grpc_call_element * elem, grpc_closure_list * closure_list) +{ channel_data *chand = elem->channel_data; - return grpc_transport_get_peer(chand->transport, closure_list); + return grpc_transport_get_peer (chand->transport, closure_list); } const grpc_channel_filter grpc_connected_channel_filter = { - con_start_transport_stream_op, - con_start_transport_op, - sizeof(call_data), - init_call_elem, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - con_get_peer, - "connected", + con_start_transport_stream_op, + con_start_transport_op, + sizeof (call_data), + init_call_elem, + destroy_call_elem, + sizeof (channel_data), + init_channel_elem, + destroy_channel_elem, + con_get_peer, + "connected", }; -void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack, - grpc_transport *transport) { +void +grpc_connected_channel_bind_transport (grpc_channel_stack * channel_stack, grpc_transport * transport) +{ /* Assumes that the connected channel filter is always the last filter in a channel stack */ - grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); - channel_data *cd = (channel_data *)elem->channel_data; - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - GPR_ASSERT(cd->transport == NULL); + grpc_channel_element *elem = grpc_channel_stack_last_element (channel_stack); + channel_data *cd = (channel_data *) elem->channel_data; + GPR_ASSERT (elem->filter == &grpc_connected_channel_filter); + GPR_ASSERT (cd->transport == NULL); cd->transport = transport; /* HACK(ctiller): increase call stack size for the channel to make space @@ -161,5 +162,5 @@ void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack, This is only "safe" because call stacks place no additional data after the last call element, and the last call element MUST be the connected channel. */ - channel_stack->call_stack_size += grpc_transport_stream_size(transport); + channel_stack->call_stack_size += grpc_transport_stream_size (transport); } diff --git a/src/core/channel/connected_channel.h b/src/core/channel/connected_channel.h index b615b0d350..67bcd6c911 100644 --- a/src/core/channel/connected_channel.h +++ b/src/core/channel/connected_channel.h @@ -43,7 +43,6 @@ extern const grpc_channel_filter grpc_connected_channel_filter; /* Post construction fixup: set the transport in the connected channel. Must be called before any call stack using this filter is used. */ -void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack, - grpc_transport *transport); +void grpc_connected_channel_bind_transport (grpc_channel_stack * channel_stack, grpc_transport * transport); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */ diff --git a/src/core/channel/context.h b/src/core/channel/context.h index ac5796b9ef..076dc7189b 100644 --- a/src/core/channel/context.h +++ b/src/core/channel/context.h @@ -35,15 +35,17 @@ #define GRPC_INTERNAL_CORE_CHANNEL_CONTEXT_H /* Call object context pointers */ -typedef enum { +typedef enum +{ GRPC_CONTEXT_SECURITY = 0, GRPC_CONTEXT_TRACING, GRPC_CONTEXT_COUNT } grpc_context_index; -typedef struct { +typedef struct +{ void *value; - void (*destroy)(void *); + void (*destroy) (void *); } grpc_call_context_element; #endif /* GRPC_INTERNAL_CORE_CHANNEL_CONTEXT_H */ diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 6fbe4738e7..1a166833e7 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -37,7 +37,8 @@ #include <grpc/support/string_util.h> #include "src/core/support/string.h" -typedef struct call_data { +typedef struct call_data +{ grpc_linked_mdelem method; grpc_linked_mdelem scheme; grpc_linked_mdelem authority; @@ -57,7 +58,8 @@ typedef struct call_data { grpc_closure hc_on_recv; } call_data; -typedef struct channel_data { +typedef struct channel_data +{ grpc_mdelem *te_trailers; grpc_mdelem *method; grpc_mdelem *scheme; @@ -67,224 +69,255 @@ typedef struct channel_data { grpc_mdelem *user_agent; } channel_data; -typedef struct { +typedef struct +{ grpc_call_element *elem; grpc_closure_list *closure_list; } client_recv_filter_args; -static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { +static grpc_mdelem * +client_recv_filter (void *user_data, grpc_mdelem * md) +{ client_recv_filter_args *a = user_data; grpc_call_element *elem = a->elem; channel_data *channeld = elem->channel_data; - if (md == channeld->status) { - return NULL; - } else if (md->key == channeld->status->key) { - grpc_call_element_send_cancel(elem, a->closure_list); - return NULL; - } else if (md->key == channeld->content_type->key) { - return NULL; - } + if (md == channeld->status) + { + return NULL; + } + else if (md->key == channeld->status->key) + { + grpc_call_element_send_cancel (elem, a->closure_list); + return NULL; + } + else if (md->key == channeld->content_type->key) + { + return NULL; + } return md; } -static void hc_on_recv(void *user_data, int success, - grpc_closure_list *closure_list) { +static void +hc_on_recv (void *user_data, int success, grpc_closure_list * closure_list) +{ grpc_call_element *elem = user_data; call_data *calld = elem->call_data; size_t i; size_t nops = calld->recv_ops->nops; grpc_stream_op *ops = calld->recv_ops->ops; - for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - client_recv_filter_args a; - if (op->type != GRPC_OP_METADATA) continue; - calld->got_initial_metadata = 1; - a.elem = elem; - a.closure_list = closure_list; - grpc_metadata_batch_filter(&op->data.metadata, client_recv_filter, &a); - } - calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success, closure_list); + for (i = 0; i < nops; i++) + { + grpc_stream_op *op = &ops[i]; + client_recv_filter_args a; + if (op->type != GRPC_OP_METADATA) + continue; + calld->got_initial_metadata = 1; + a.elem = elem; + a.closure_list = closure_list; + grpc_metadata_batch_filter (&op->data.metadata, client_recv_filter, &a); + } + calld->on_done_recv->cb (calld->on_done_recv->cb_arg, success, closure_list); } -static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { +static grpc_mdelem * +client_strip_filter (void *user_data, grpc_mdelem * md) +{ grpc_call_element *elem = user_data; channel_data *channeld = elem->channel_data; /* eat the things we'd like to set ourselves */ - if (md->key == channeld->method->key) return NULL; - if (md->key == channeld->scheme->key) return NULL; - if (md->key == channeld->te_trailers->key) return NULL; - if (md->key == channeld->content_type->key) return NULL; - if (md->key == channeld->user_agent->key) return NULL; + if (md->key == channeld->method->key) + return NULL; + if (md->key == channeld->scheme->key) + return NULL; + if (md->key == channeld->te_trailers->key) + return NULL; + if (md->key == channeld->content_type->key) + return NULL; + if (md->key == channeld->user_agent->key) + return NULL; return md; } -static void hc_mutate_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void +hc_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op) +{ /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; size_t i; - if (op->send_ops && !calld->sent_initial_metadata) { - size_t nops = op->send_ops->nops; - grpc_stream_op *ops = op->send_ops->ops; - for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - if (op->type != GRPC_OP_METADATA) continue; - calld->sent_initial_metadata = 1; - grpc_metadata_batch_filter(&op->data.metadata, client_strip_filter, elem); - /* Send : prefixed headers, which have to be before any application - layer headers. */ - grpc_metadata_batch_add_head(&op->data.metadata, &calld->method, - GRPC_MDELEM_REF(channeld->method)); - grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme, - GRPC_MDELEM_REF(channeld->scheme)); - grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers, - GRPC_MDELEM_REF(channeld->te_trailers)); - grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, - GRPC_MDELEM_REF(channeld->content_type)); - grpc_metadata_batch_add_tail(&op->data.metadata, &calld->user_agent, - GRPC_MDELEM_REF(channeld->user_agent)); - break; + if (op->send_ops && !calld->sent_initial_metadata) + { + size_t nops = op->send_ops->nops; + grpc_stream_op *ops = op->send_ops->ops; + for (i = 0; i < nops; i++) + { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) + continue; + calld->sent_initial_metadata = 1; + grpc_metadata_batch_filter (&op->data.metadata, client_strip_filter, elem); + /* Send : prefixed headers, which have to be before any application + layer headers. */ + grpc_metadata_batch_add_head (&op->data.metadata, &calld->method, GRPC_MDELEM_REF (channeld->method)); + grpc_metadata_batch_add_head (&op->data.metadata, &calld->scheme, GRPC_MDELEM_REF (channeld->scheme)); + grpc_metadata_batch_add_tail (&op->data.metadata, &calld->te_trailers, GRPC_MDELEM_REF (channeld->te_trailers)); + grpc_metadata_batch_add_tail (&op->data.metadata, &calld->content_type, GRPC_MDELEM_REF (channeld->content_type)); + grpc_metadata_batch_add_tail (&op->data.metadata, &calld->user_agent, GRPC_MDELEM_REF (channeld->user_agent)); + break; + } } - } - if (op->recv_ops && !calld->got_initial_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_ops = op->recv_ops; - calld->on_done_recv = op->on_done_recv; - op->on_done_recv = &calld->hc_on_recv; - } + if (op->recv_ops && !calld->got_initial_metadata) + { + /* substitute our callback for the higher callback */ + calld->recv_ops = op->recv_ops; + calld->on_done_recv = op->on_done_recv; + op->on_done_recv = &calld->hc_on_recv; + } } -static void hc_start_transport_op(grpc_call_element *elem, - grpc_transport_stream_op *op, - grpc_closure_list *closure_list) { - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - hc_mutate_op(elem, op); - grpc_call_next_op(elem, op, closure_list); +static void +hc_start_transport_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list) +{ + GRPC_CALL_LOG_OP (GPR_INFO, elem, op); + hc_mutate_op (elem, op); + grpc_call_next_op (elem, op, closure_list); } /* Constructor for call_data */ -static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op, - grpc_closure_list *closure_list) { +static void +init_call_elem (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list) +{ call_data *calld = elem->call_data; calld->sent_initial_metadata = 0; calld->got_initial_metadata = 0; calld->on_done_recv = NULL; - grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem); - if (initial_op) hc_mutate_op(elem, initial_op); + grpc_closure_init (&calld->hc_on_recv, hc_on_recv, elem); + if (initial_op) + hc_mutate_op (elem, initial_op); } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem, - grpc_closure_list *closure_list) {} +static void +destroy_call_elem (grpc_call_element * elem, grpc_closure_list * closure_list) +{ +} -static const char *scheme_from_args(const grpc_channel_args *args) { +static const char * +scheme_from_args (const grpc_channel_args * args) +{ unsigned i; - if (args != NULL) { - for (i = 0; i < args->num_args; ++i) { - if (args->args[i].type == GRPC_ARG_STRING && - strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) { - return args->args[i].value.string; - } + if (args != NULL) + { + for (i = 0; i < args->num_args; ++i) + { + if (args->args[i].type == GRPC_ARG_STRING && strcmp (args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) + { + return args->args[i].value.string; + } + } } - } return "http"; } -static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx, - const grpc_channel_args *args) { +static grpc_mdstr * +user_agent_from_args (grpc_mdctx * mdctx, const grpc_channel_args * args) +{ gpr_strvec v; size_t i; int is_first = 1; char *tmp; grpc_mdstr *result; - gpr_strvec_init(&v); + gpr_strvec_init (&v); - for (i = 0; args && i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) { - if (args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", - GRPC_ARG_PRIMARY_USER_AGENT_STRING); - } else { - if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); - is_first = 0; - gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); - } + for (i = 0; args && i < args->num_args; i++) + { + if (0 == strcmp (args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) + { + if (args->args[i].type != GRPC_ARG_STRING) + { + gpr_log (GPR_ERROR, "Channel argument '%s' should be a string", GRPC_ARG_PRIMARY_USER_AGENT_STRING); + } + else + { + if (!is_first) + gpr_strvec_add (&v, gpr_strdup (" ")); + is_first = 0; + gpr_strvec_add (&v, gpr_strdup (args->args[i].value.string)); + } + } } - } - gpr_asprintf(&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ", - grpc_version_string(), GPR_PLATFORM_STRING); + gpr_asprintf (&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ", grpc_version_string (), GPR_PLATFORM_STRING); is_first = 0; - gpr_strvec_add(&v, tmp); + gpr_strvec_add (&v, tmp); - for (i = 0; args && i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) { - if (args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", - GRPC_ARG_SECONDARY_USER_AGENT_STRING); - } else { - if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); - is_first = 0; - gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); - } + for (i = 0; args && i < args->num_args; i++) + { + if (0 == strcmp (args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) + { + if (args->args[i].type != GRPC_ARG_STRING) + { + gpr_log (GPR_ERROR, "Channel argument '%s' should be a string", GRPC_ARG_SECONDARY_USER_AGENT_STRING); + } + else + { + if (!is_first) + gpr_strvec_add (&v, gpr_strdup (" ")); + is_first = 0; + gpr_strvec_add (&v, gpr_strdup (args->args[i].value.string)); + } + } } - } - tmp = gpr_strvec_flatten(&v, NULL); - gpr_strvec_destroy(&v); - result = grpc_mdstr_from_string(mdctx, tmp, 0); - gpr_free(tmp); + tmp = gpr_strvec_flatten (&v, NULL); + gpr_strvec_destroy (&v); + result = grpc_mdstr_from_string (mdctx, tmp, 0); + gpr_free (tmp); return result; } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *channel_args, - grpc_mdctx *mdctx, int is_first, int is_last, - grpc_closure_list *closure_list) { +static void +init_channel_elem (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * channel_args, grpc_mdctx * mdctx, int is_first, int is_last, grpc_closure_list * closure_list) +{ /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; /* The first and the last filters tend to be implemented differently to handle the case that there's no 'next' filter to call on the up or down path */ - GPR_ASSERT(!is_last); + GPR_ASSERT (!is_last); /* initialize members */ - channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers"); - channeld->method = grpc_mdelem_from_strings(mdctx, ":method", "POST"); - channeld->scheme = grpc_mdelem_from_strings(mdctx, ":scheme", - scheme_from_args(channel_args)); - channeld->content_type = - grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); - channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200"); - channeld->user_agent = grpc_mdelem_from_metadata_strings( - mdctx, grpc_mdstr_from_string(mdctx, "user-agent", 0), - user_agent_from_args(mdctx, channel_args)); + channeld->te_trailers = grpc_mdelem_from_strings (mdctx, "te", "trailers"); + channeld->method = grpc_mdelem_from_strings (mdctx, ":method", "POST"); + channeld->scheme = grpc_mdelem_from_strings (mdctx, ":scheme", scheme_from_args (channel_args)); + channeld->content_type = grpc_mdelem_from_strings (mdctx, "content-type", "application/grpc"); + channeld->status = grpc_mdelem_from_strings (mdctx, ":status", "200"); + channeld->user_agent = grpc_mdelem_from_metadata_strings (mdctx, grpc_mdstr_from_string (mdctx, "user-agent", 0), user_agent_from_args (mdctx, channel_args)); } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element *elem, - grpc_closure_list *closure_list) { +static void +destroy_channel_elem (grpc_channel_element * elem, grpc_closure_list * closure_list) +{ /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; - GRPC_MDELEM_UNREF(channeld->te_trailers); - GRPC_MDELEM_UNREF(channeld->method); - GRPC_MDELEM_UNREF(channeld->scheme); - GRPC_MDELEM_UNREF(channeld->content_type); - GRPC_MDELEM_UNREF(channeld->status); - GRPC_MDELEM_UNREF(channeld->user_agent); + GRPC_MDELEM_UNREF (channeld->te_trailers); + GRPC_MDELEM_UNREF (channeld->method); + GRPC_MDELEM_UNREF (channeld->scheme); + GRPC_MDELEM_UNREF (channeld->content_type); + GRPC_MDELEM_UNREF (channeld->status); + GRPC_MDELEM_UNREF (channeld->user_agent); } const grpc_channel_filter grpc_http_client_filter = { - hc_start_transport_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, - "http-client"}; + hc_start_transport_op, grpc_channel_next_op, sizeof (call_data), + init_call_elem, destroy_call_elem, sizeof (channel_data), + init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + "http-client" +}; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index fb1f0b0554..949868b403 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -37,7 +37,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -typedef struct call_data { +typedef struct call_data +{ gpr_uint8 got_initial_metadata; gpr_uint8 seen_path; gpr_uint8 seen_post; @@ -57,7 +58,8 @@ typedef struct call_data { grpc_closure hs_on_recv; } call_data; -typedef struct channel_data { +typedef struct channel_data +{ grpc_mdelem *te_trailers; grpc_mdelem *method_post; grpc_mdelem *http_scheme; @@ -74,236 +76,268 @@ typedef struct channel_data { grpc_mdctx *mdctx; } channel_data; -typedef struct { +typedef struct +{ grpc_call_element *elem; grpc_closure_list *closure_list; } server_filter_args; -static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { +static grpc_mdelem * +server_filter (void *user_data, grpc_mdelem * md) +{ server_filter_args *a = user_data; grpc_call_element *elem = a->elem; channel_data *channeld = elem->channel_data; call_data *calld = elem->call_data; /* Check if it is one of the headers we care about. */ - if (md == channeld->te_trailers || md == channeld->method_post || - md == channeld->http_scheme || md == channeld->https_scheme || - md == channeld->grpc_scheme || md == channeld->content_type) { - /* swallow it */ - if (md == channeld->method_post) { - calld->seen_post = 1; - } else if (md->key == channeld->http_scheme->key) { - calld->seen_scheme = 1; - } else if (md == channeld->te_trailers) { - calld->seen_te_trailers = 1; + if (md == channeld->te_trailers || md == channeld->method_post || md == channeld->http_scheme || md == channeld->https_scheme || md == channeld->grpc_scheme || md == channeld->content_type) + { + /* swallow it */ + if (md == channeld->method_post) + { + calld->seen_post = 1; + } + else if (md->key == channeld->http_scheme->key) + { + calld->seen_scheme = 1; + } + else if (md == channeld->te_trailers) + { + calld->seen_te_trailers = 1; + } + /* TODO(klempner): Track that we've seen all the headers we should + require */ + return NULL; } - /* TODO(klempner): Track that we've seen all the headers we should - require */ - return NULL; - } else if (md->key == channeld->content_type->key) { - if (strncmp(grpc_mdstr_as_c_string(md->value), "application/grpc+", 17) == - 0) { - /* Although the C implementation doesn't (currently) generate them, - any custom +-suffix is explicitly valid. */ - /* TODO(klempner): We should consider preallocating common values such - as +proto or +json, or at least stashing them if we see them. */ - /* TODO(klempner): Should we be surfacing this to application code? */ - } else { - /* TODO(klempner): We're currently allowing this, but we shouldn't - see it without a proxy so log for now. */ - gpr_log(GPR_INFO, "Unexpected content-type %s", - channeld->content_type->key); + else if (md->key == channeld->content_type->key) + { + if (strncmp (grpc_mdstr_as_c_string (md->value), "application/grpc+", 17) == 0) + { + /* Although the C implementation doesn't (currently) generate them, + any custom +-suffix is explicitly valid. */ + /* TODO(klempner): We should consider preallocating common values such + as +proto or +json, or at least stashing them if we see them. */ + /* TODO(klempner): Should we be surfacing this to application code? */ + } + else + { + /* TODO(klempner): We're currently allowing this, but we shouldn't + see it without a proxy so log for now. */ + gpr_log (GPR_INFO, "Unexpected content-type %s", channeld->content_type->key); + } + return NULL; } - return NULL; - } else if (md->key == channeld->te_trailers->key || - md->key == channeld->method_post->key || - md->key == channeld->http_scheme->key) { - gpr_log(GPR_ERROR, "Invalid %s: header: '%s'", - grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)); - /* swallow it and error everything out. */ - /* TODO(klempner): We ought to generate more descriptive error messages - on the wire here. */ - grpc_call_element_send_cancel(elem, a->closure_list); - return NULL; - } else if (md->key == channeld->path_key) { - if (calld->seen_path) { - gpr_log(GPR_ERROR, "Received :path twice"); + else if (md->key == channeld->te_trailers->key || md->key == channeld->method_post->key || md->key == channeld->http_scheme->key) + { + gpr_log (GPR_ERROR, "Invalid %s: header: '%s'", grpc_mdstr_as_c_string (md->key), grpc_mdstr_as_c_string (md->value)); + /* swallow it and error everything out. */ + /* TODO(klempner): We ought to generate more descriptive error messages + on the wire here. */ + grpc_call_element_send_cancel (elem, a->closure_list); return NULL; } - calld->seen_path = 1; - return md; - } else if (md->key == channeld->authority_key) { - calld->seen_authority = 1; - return md; - } else if (md->key == channeld->host_key) { - /* translate host to :authority since :authority may be - omitted */ - grpc_mdelem *authority = grpc_mdelem_from_metadata_strings( - channeld->mdctx, GRPC_MDSTR_REF(channeld->authority_key), - GRPC_MDSTR_REF(md->value)); - GRPC_MDELEM_UNREF(md); - calld->seen_authority = 1; - return authority; - } else { - return md; - } + else if (md->key == channeld->path_key) + { + if (calld->seen_path) + { + gpr_log (GPR_ERROR, "Received :path twice"); + return NULL; + } + calld->seen_path = 1; + return md; + } + else if (md->key == channeld->authority_key) + { + calld->seen_authority = 1; + return md; + } + else if (md->key == channeld->host_key) + { + /* translate host to :authority since :authority may be + omitted */ + grpc_mdelem *authority = grpc_mdelem_from_metadata_strings (channeld->mdctx, GRPC_MDSTR_REF (channeld->authority_key), + GRPC_MDSTR_REF (md->value)); + GRPC_MDELEM_UNREF (md); + calld->seen_authority = 1; + return authority; + } + else + { + return md; + } } -static void hs_on_recv(void *user_data, int success, - grpc_closure_list *closure_list) { +static void +hs_on_recv (void *user_data, int success, grpc_closure_list * closure_list) +{ grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - if (success) { - size_t i; - size_t nops = calld->recv_ops->nops; - grpc_stream_op *ops = calld->recv_ops->ops; - for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - server_filter_args a; - if (op->type != GRPC_OP_METADATA) continue; - calld->got_initial_metadata = 1; - a.elem = elem; - a.closure_list = closure_list; - grpc_metadata_batch_filter(&op->data.metadata, server_filter, &a); - /* Have we seen the required http2 transport headers? - (:method, :scheme, content-type, with :path and :authority covered - at the channel level right now) */ - if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && - calld->seen_path && calld->seen_authority) { - /* do nothing */ - } else { - if (!calld->seen_path) { - gpr_log(GPR_ERROR, "Missing :path header"); - } - if (!calld->seen_authority) { - gpr_log(GPR_ERROR, "Missing :authority header"); - } - if (!calld->seen_post) { - gpr_log(GPR_ERROR, "Missing :method header"); - } - if (!calld->seen_scheme) { - gpr_log(GPR_ERROR, "Missing :scheme header"); - } - if (!calld->seen_te_trailers) { - gpr_log(GPR_ERROR, "Missing te trailers header"); - } - /* Error this call out */ - success = 0; - grpc_call_element_send_cancel(elem, closure_list); - } + if (success) + { + size_t i; + size_t nops = calld->recv_ops->nops; + grpc_stream_op *ops = calld->recv_ops->ops; + for (i = 0; i < nops; i++) + { + grpc_stream_op *op = &ops[i]; + server_filter_args a; + if (op->type != GRPC_OP_METADATA) + continue; + calld->got_initial_metadata = 1; + a.elem = elem; + a.closure_list = closure_list; + grpc_metadata_batch_filter (&op->data.metadata, server_filter, &a); + /* Have we seen the required http2 transport headers? + (:method, :scheme, content-type, with :path and :authority covered + at the channel level right now) */ + if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && calld->seen_path && calld->seen_authority) + { + /* do nothing */ + } + else + { + if (!calld->seen_path) + { + gpr_log (GPR_ERROR, "Missing :path header"); + } + if (!calld->seen_authority) + { + gpr_log (GPR_ERROR, "Missing :authority header"); + } + if (!calld->seen_post) + { + gpr_log (GPR_ERROR, "Missing :method header"); + } + if (!calld->seen_scheme) + { + gpr_log (GPR_ERROR, "Missing :scheme header"); + } + if (!calld->seen_te_trailers) + { + gpr_log (GPR_ERROR, "Missing te trailers header"); + } + /* Error this call out */ + success = 0; + grpc_call_element_send_cancel (elem, closure_list); + } + } } - } - calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success, closure_list); + calld->on_done_recv->cb (calld->on_done_recv->cb_arg, success, closure_list); } -static void hs_mutate_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void +hs_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op) +{ /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; size_t i; - if (op->send_ops && !calld->sent_status) { - size_t nops = op->send_ops->nops; - grpc_stream_op *ops = op->send_ops->ops; - for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - if (op->type != GRPC_OP_METADATA) continue; - calld->sent_status = 1; - grpc_metadata_batch_add_head(&op->data.metadata, &calld->status, - GRPC_MDELEM_REF(channeld->status_ok)); - grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, - GRPC_MDELEM_REF(channeld->content_type)); - break; + if (op->send_ops && !calld->sent_status) + { + size_t nops = op->send_ops->nops; + grpc_stream_op *ops = op->send_ops->ops; + for (i = 0; i < nops; i++) + { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) + continue; + calld->sent_status = 1; + grpc_metadata_batch_add_head (&op->data.metadata, &calld->status, GRPC_MDELEM_REF (channeld->status_ok)); + grpc_metadata_batch_add_tail (&op->data.metadata, &calld->content_type, GRPC_MDELEM_REF (channeld->content_type)); + break; + } } - } - if (op->recv_ops && !calld->got_initial_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_ops = op->recv_ops; - calld->on_done_recv = op->on_done_recv; - op->on_done_recv = &calld->hs_on_recv; - } + if (op->recv_ops && !calld->got_initial_metadata) + { + /* substitute our callback for the higher callback */ + calld->recv_ops = op->recv_ops; + calld->on_done_recv = op->on_done_recv; + op->on_done_recv = &calld->hs_on_recv; + } } -static void hs_start_transport_op(grpc_call_element *elem, - grpc_transport_stream_op *op, - grpc_closure_list *closure_list) { - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - hs_mutate_op(elem, op); - grpc_call_next_op(elem, op, closure_list); +static void +hs_start_transport_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list) +{ + GRPC_CALL_LOG_OP (GPR_INFO, elem, op); + hs_mutate_op (elem, op); + grpc_call_next_op (elem, op, closure_list); } /* Constructor for call_data */ -static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op, - grpc_closure_list *closure_list) { +static void +init_call_elem (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list) +{ /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ - memset(calld, 0, sizeof(*calld)); - grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem); - if (initial_op) hs_mutate_op(elem, initial_op); + memset (calld, 0, sizeof (*calld)); + grpc_closure_init (&calld->hs_on_recv, hs_on_recv, elem); + if (initial_op) + hs_mutate_op (elem, initial_op); } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem, - grpc_closure_list *closure_list) {} +static void +destroy_call_elem (grpc_call_element * elem, grpc_closure_list * closure_list) +{ +} /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last, - grpc_closure_list *closure_list) { +static void +init_channel_elem (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last, grpc_closure_list * closure_list) +{ /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; /* The first and the last filters tend to be implemented differently to handle the case that there's no 'next' filter to call on the up or down path */ - GPR_ASSERT(!is_first); - GPR_ASSERT(!is_last); + GPR_ASSERT (!is_first); + GPR_ASSERT (!is_last); /* initialize members */ - channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers"); - channeld->status_ok = grpc_mdelem_from_strings(mdctx, ":status", "200"); - channeld->status_not_found = - grpc_mdelem_from_strings(mdctx, ":status", "404"); - channeld->method_post = grpc_mdelem_from_strings(mdctx, ":method", "POST"); - channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http"); - channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https"); - channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc"); - channeld->path_key = grpc_mdstr_from_string(mdctx, ":path", 0); - channeld->authority_key = grpc_mdstr_from_string(mdctx, ":authority", 0); - channeld->host_key = grpc_mdstr_from_string(mdctx, "host", 0); - channeld->content_type = - grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); + channeld->te_trailers = grpc_mdelem_from_strings (mdctx, "te", "trailers"); + channeld->status_ok = grpc_mdelem_from_strings (mdctx, ":status", "200"); + channeld->status_not_found = grpc_mdelem_from_strings (mdctx, ":status", "404"); + channeld->method_post = grpc_mdelem_from_strings (mdctx, ":method", "POST"); + channeld->http_scheme = grpc_mdelem_from_strings (mdctx, ":scheme", "http"); + channeld->https_scheme = grpc_mdelem_from_strings (mdctx, ":scheme", "https"); + channeld->grpc_scheme = grpc_mdelem_from_strings (mdctx, ":scheme", "grpc"); + channeld->path_key = grpc_mdstr_from_string (mdctx, ":path", 0); + channeld->authority_key = grpc_mdstr_from_string (mdctx, ":authority", 0); + channeld->host_key = grpc_mdstr_from_string (mdctx, "host", 0); + channeld->content_type = grpc_mdelem_from_strings (mdctx, "content-type", "application/grpc"); channeld->mdctx = mdctx; } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element *elem, - grpc_closure_list *closure_list) { +static void +destroy_channel_elem (grpc_channel_element * elem, grpc_closure_list * closure_list) +{ /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; - GRPC_MDELEM_UNREF(channeld->te_trailers); - GRPC_MDELEM_UNREF(channeld->status_ok); - GRPC_MDELEM_UNREF(channeld->status_not_found); - GRPC_MDELEM_UNREF(channeld->method_post); - GRPC_MDELEM_UNREF(channeld->http_scheme); - GRPC_MDELEM_UNREF(channeld->https_scheme); - GRPC_MDELEM_UNREF(channeld->grpc_scheme); - GRPC_MDELEM_UNREF(channeld->content_type); - GRPC_MDSTR_UNREF(channeld->path_key); - GRPC_MDSTR_UNREF(channeld->authority_key); - GRPC_MDSTR_UNREF(channeld->host_key); + GRPC_MDELEM_UNREF (channeld->te_trailers); + GRPC_MDELEM_UNREF (channeld->status_ok); + GRPC_MDELEM_UNREF (channeld->status_not_found); + GRPC_MDELEM_UNREF (channeld->method_post); + GRPC_MDELEM_UNREF (channeld->http_scheme); + GRPC_MDELEM_UNREF (channeld->https_scheme); + GRPC_MDELEM_UNREF (channeld->grpc_scheme); + GRPC_MDELEM_UNREF (channeld->content_type); + GRPC_MDSTR_UNREF (channeld->path_key); + GRPC_MDSTR_UNREF (channeld->authority_key); + GRPC_MDSTR_UNREF (channeld->host_key); } const grpc_channel_filter grpc_http_server_filter = { - hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, - "http-server"}; + hs_start_transport_op, grpc_channel_next_op, sizeof (call_data), + init_call_elem, destroy_call_elem, sizeof (channel_data), + init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + "http-server" +}; diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index e4f6c7f837..a93497c509 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -34,25 +34,31 @@ #include "src/core/channel/noop_filter.h" #include <grpc/support/log.h> -typedef struct call_data { - int unused; /* C89 requires at least one struct element */ +typedef struct call_data +{ + int unused; /* C89 requires at least one struct element */ } call_data; -typedef struct channel_data { - int unused; /* C89 requires at least one struct element */ +typedef struct channel_data +{ + int unused; /* C89 requires at least one struct element */ } channel_data; /* used to silence 'variable not used' warnings */ -static void ignore_unused(void *ignored) {} +static void +ignore_unused (void *ignored) +{ +} -static void noop_mutate_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void +noop_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op) +{ /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; - ignore_unused(calld); - ignore_unused(channeld); + ignore_unused (calld); + ignore_unused (channeld); /* do nothing */ } @@ -62,20 +68,19 @@ static void noop_mutate_op(grpc_call_element *elem, - a network event (or similar) from below, to receive something op contains type and call direction information, in addition to the data that is being sent or received. */ -static void noop_start_transport_stream_op(grpc_call_element *elem, - grpc_transport_stream_op *op, - grpc_closure_list *closure_list) { - noop_mutate_op(elem, op); +static void +noop_start_transport_stream_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list) +{ + noop_mutate_op (elem, op); /* pass control down the stack */ - grpc_call_next_op(elem, op, closure_list); + grpc_call_next_op (elem, op, closure_list); } /* Constructor for call_data */ -static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op, - grpc_closure_list *closure_list) { +static void +init_call_elem (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list) +{ /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -83,47 +88,51 @@ static void init_call_elem(grpc_call_element *elem, /* initialize members */ calld->unused = channeld->unused; - if (initial_op) noop_mutate_op(elem, initial_op); + if (initial_op) + noop_mutate_op (elem, initial_op); } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem, - grpc_closure_list *closure_list) {} +static void +destroy_call_elem (grpc_call_element * elem, grpc_closure_list * closure_list) +{ +} /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last, - grpc_closure_list *closure_list) { +static void +init_channel_elem (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last, grpc_closure_list * closure_list) +{ /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; /* The first and the last filters tend to be implemented differently to handle the case that there's no 'next' filter to call on the up or down path */ - GPR_ASSERT(!is_first); - GPR_ASSERT(!is_last); + GPR_ASSERT (!is_first); + GPR_ASSERT (!is_last); /* initialize members */ channeld->unused = 0; } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element *elem, - grpc_closure_list *closure_list) { +static void +destroy_channel_elem (grpc_channel_element * elem, grpc_closure_list * closure_list) +{ /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; - ignore_unused(channeld); + ignore_unused (channeld); } -const grpc_channel_filter grpc_no_op_filter = {noop_start_transport_stream_op, - grpc_channel_next_op, - sizeof(call_data), - init_call_elem, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - grpc_call_next_get_peer, - "no-op"}; +const grpc_channel_filter grpc_no_op_filter = { noop_start_transport_stream_op, + grpc_channel_next_op, + sizeof (call_data), + init_call_elem, + destroy_call_elem, + sizeof (channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + "no-op" +}; |