aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-22 10:42:19 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-22 10:42:19 -0700
commit45724b35e411fef7c5da66a74c78428c11d56843 (patch)
tree9264034aca675c89444e02f72ef58e67d7043604 /src/core/channel
parent298751c1195523ef6228595043b583c3a6270e08 (diff)
indent pass to get logical source lines on one physical line
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/channel_args.c251
-rw-r--r--src/core/channel/channel_args.h25
-rw-r--r--src/core/channel/channel_stack.c189
-rw-r--r--src/core/channel/channel_stack.h94
-rw-r--r--src/core/channel/client_channel.c1040
-rw-r--r--src/core/channel/client_channel.h23
-rw-r--r--src/core/channel/compress_filter.c452
-rw-r--r--src/core/channel/connected_channel.c121
-rw-r--r--src/core/channel/connected_channel.h3
-rw-r--r--src/core/channel/context.h8
-rw-r--r--src/core/channel/http_client_filter.c315
-rw-r--r--src/core/channel/http_server_filter.c384
-rw-r--r--src/core/channel/noop_filter.c89
13 files changed, 1560 insertions, 1434 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c
index 487db1119a..4207a97114 100644
--- a/src/core/channel/channel_args.c
+++ b/src/core/channel/channel_args.c
@@ -41,169 +41,206 @@
#include <string.h>
-static grpc_arg copy_arg(const grpc_arg *src) {
+static grpc_arg
+copy_arg (const grpc_arg * src)
+{
grpc_arg dst;
dst.type = src->type;
- dst.key = gpr_strdup(src->key);
- switch (dst.type) {
+ dst.key = gpr_strdup (src->key);
+ switch (dst.type)
+ {
case GRPC_ARG_STRING:
- dst.value.string = gpr_strdup(src->value.string);
+ dst.value.string = gpr_strdup (src->value.string);
break;
case GRPC_ARG_INTEGER:
dst.value.integer = src->value.integer;
break;
case GRPC_ARG_POINTER:
dst.value.pointer = src->value.pointer;
- dst.value.pointer.p = src->value.pointer.copy
- ? src->value.pointer.copy(src->value.pointer.p)
- : src->value.pointer.p;
+ dst.value.pointer.p = src->value.pointer.copy ? src->value.pointer.copy (src->value.pointer.p) : src->value.pointer.p;
break;
- }
+ }
return dst;
}
-grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
- const grpc_arg *to_add,
- size_t num_to_add) {
- grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args));
+grpc_channel_args *
+grpc_channel_args_copy_and_add (const grpc_channel_args * src, const grpc_arg * to_add, size_t num_to_add)
+{
+ grpc_channel_args *dst = gpr_malloc (sizeof (grpc_channel_args));
size_t i;
size_t src_num_args = (src == NULL) ? 0 : src->num_args;
- if (!src && !to_add) {
- dst->num_args = 0;
- dst->args = NULL;
- return dst;
- }
+ if (!src && !to_add)
+ {
+ dst->num_args = 0;
+ dst->args = NULL;
+ return dst;
+ }
dst->num_args = src_num_args + num_to_add;
- dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args);
- for (i = 0; i < src_num_args; i++) {
- dst->args[i] = copy_arg(&src->args[i]);
- }
- for (i = 0; i < num_to_add; i++) {
- dst->args[i + src_num_args] = copy_arg(&to_add[i]);
- }
+ dst->args = gpr_malloc (sizeof (grpc_arg) * dst->num_args);
+ for (i = 0; i < src_num_args; i++)
+ {
+ dst->args[i] = copy_arg (&src->args[i]);
+ }
+ for (i = 0; i < num_to_add; i++)
+ {
+ dst->args[i + src_num_args] = copy_arg (&to_add[i]);
+ }
return dst;
}
-grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src) {
- return grpc_channel_args_copy_and_add(src, NULL, 0);
+grpc_channel_args *
+grpc_channel_args_copy (const grpc_channel_args * src)
+{
+ return grpc_channel_args_copy_and_add (src, NULL, 0);
}
-grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
- const grpc_channel_args *b) {
- return grpc_channel_args_copy_and_add(a, b->args, b->num_args);
+grpc_channel_args *
+grpc_channel_args_merge (const grpc_channel_args * a, const grpc_channel_args * b)
+{
+ return grpc_channel_args_copy_and_add (a, b->args, b->num_args);
}
-void grpc_channel_args_destroy(grpc_channel_args *a) {
+void
+grpc_channel_args_destroy (grpc_channel_args * a)
+{
size_t i;
- for (i = 0; i < a->num_args; i++) {
- switch (a->args[i].type) {
- case GRPC_ARG_STRING:
- gpr_free(a->args[i].value.string);
- break;
- case GRPC_ARG_INTEGER:
- break;
- case GRPC_ARG_POINTER:
- if (a->args[i].value.pointer.destroy) {
- a->args[i].value.pointer.destroy(a->args[i].value.pointer.p);
- }
- break;
+ for (i = 0; i < a->num_args; i++)
+ {
+ switch (a->args[i].type)
+ {
+ case GRPC_ARG_STRING:
+ gpr_free (a->args[i].value.string);
+ break;
+ case GRPC_ARG_INTEGER:
+ break;
+ case GRPC_ARG_POINTER:
+ if (a->args[i].value.pointer.destroy)
+ {
+ a->args[i].value.pointer.destroy (a->args[i].value.pointer.p);
+ }
+ break;
+ }
+ gpr_free (a->args[i].key);
}
- gpr_free(a->args[i].key);
- }
- gpr_free(a->args);
- gpr_free(a);
+ gpr_free (a->args);
+ gpr_free (a);
}
-int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) {
+int
+grpc_channel_args_is_census_enabled (const grpc_channel_args * a)
+{
size_t i;
- if (a == NULL) return 0;
- for (i = 0; i < a->num_args; i++) {
- if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) {
- return a->args[i].value.integer != 0;
+ if (a == NULL)
+ return 0;
+ for (i = 0; i < a->num_args; i++)
+ {
+ if (0 == strcmp (a->args[i].key, GRPC_ARG_ENABLE_CENSUS))
+ {
+ return a->args[i].value.integer != 0;
+ }
}
- }
return 0;
}
-grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
- const grpc_channel_args *a) {
+grpc_compression_algorithm
+grpc_channel_args_get_compression_algorithm (const grpc_channel_args * a)
+{
size_t i;
- if (a == NULL) return 0;
- for (i = 0; i < a->num_args; ++i) {
- if (a->args[i].type == GRPC_ARG_INTEGER &&
- !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) {
- return (grpc_compression_algorithm)a->args[i].value.integer;
- break;
+ if (a == NULL)
+ return 0;
+ for (i = 0; i < a->num_args; ++i)
+ {
+ if (a->args[i].type == GRPC_ARG_INTEGER && !strcmp (GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key))
+ {
+ return (grpc_compression_algorithm) a->args[i].value.integer;
+ break;
+ }
}
- }
return GRPC_COMPRESS_NONE;
}
-grpc_channel_args *grpc_channel_args_set_compression_algorithm(
- grpc_channel_args *a, grpc_compression_algorithm algorithm) {
+grpc_channel_args *
+grpc_channel_args_set_compression_algorithm (grpc_channel_args * a, grpc_compression_algorithm algorithm)
+{
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
tmp.key = GRPC_COMPRESSION_ALGORITHM_ARG;
tmp.value.integer = algorithm;
- return grpc_channel_args_copy_and_add(a, &tmp, 1);
+ return grpc_channel_args_copy_and_add (a, &tmp, 1);
}
/** Returns 1 if the argument for compression algorithm's enabled states bitset
* was found in \a a, returning the arg's value in \a states. Otherwise, returns
* 0. */
-static int find_compression_algorithm_states_bitset(const grpc_channel_args *a,
- int **states_arg) {
- if (a != NULL) {
- size_t i;
- for (i = 0; i < a->num_args; ++i) {
- if (a->args[i].type == GRPC_ARG_INTEGER &&
- !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) {
- *states_arg = &a->args[i].value.integer;
- return 1; /* GPR_TRUE */
- }
+static int
+find_compression_algorithm_states_bitset (const grpc_channel_args * a, int **states_arg)
+{
+ if (a != NULL)
+ {
+ size_t i;
+ for (i = 0; i < a->num_args; ++i)
+ {
+ if (a->args[i].type == GRPC_ARG_INTEGER && !strcmp (GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key))
+ {
+ *states_arg = &a->args[i].value.integer;
+ return 1; /* GPR_TRUE */
+ }
+ }
}
- }
- return 0; /* GPR_FALSE */
+ return 0; /* GPR_FALSE */
}
-grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
- grpc_channel_args **a, grpc_compression_algorithm algorithm, int state) {
+grpc_channel_args *
+grpc_channel_args_compression_algorithm_set_state (grpc_channel_args ** a, grpc_compression_algorithm algorithm, int state)
+{
int *states_arg;
grpc_channel_args *result = *a;
- const int states_arg_found =
- find_compression_algorithm_states_bitset(*a, &states_arg);
+ const int states_arg_found = find_compression_algorithm_states_bitset (*a, &states_arg);
- if (states_arg_found) {
- if (state != 0) {
- GPR_BITSET((unsigned *)states_arg, algorithm);
- } else {
- GPR_BITCLEAR((unsigned *)states_arg, algorithm);
+ if (states_arg_found)
+ {
+ if (state != 0)
+ {
+ GPR_BITSET ((unsigned *) states_arg, algorithm);
+ }
+ else
+ {
+ GPR_BITCLEAR ((unsigned *) states_arg, algorithm);
+ }
}
- } else {
- /* create a new arg */
- grpc_arg tmp;
- tmp.type = GRPC_ARG_INTEGER;
- tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
- /* all enabled by default */
- tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
- if (state != 0) {
- GPR_BITSET((unsigned *)&tmp.value.integer, algorithm);
- } else {
- GPR_BITCLEAR((unsigned *)&tmp.value.integer, algorithm);
+ else
+ {
+ /* create a new arg */
+ grpc_arg tmp;
+ tmp.type = GRPC_ARG_INTEGER;
+ tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
+ /* all enabled by default */
+ tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
+ if (state != 0)
+ {
+ GPR_BITSET ((unsigned *) &tmp.value.integer, algorithm);
+ }
+ else
+ {
+ GPR_BITCLEAR ((unsigned *) &tmp.value.integer, algorithm);
+ }
+ result = grpc_channel_args_copy_and_add (*a, &tmp, 1);
+ grpc_channel_args_destroy (*a);
+ *a = result;
}
- result = grpc_channel_args_copy_and_add(*a, &tmp, 1);
- grpc_channel_args_destroy(*a);
- *a = result;
- }
return result;
}
-int grpc_channel_args_compression_algorithm_get_states(
- const grpc_channel_args *a) {
+int
+grpc_channel_args_compression_algorithm_get_states (const grpc_channel_args * a)
+{
int *states_arg;
- if (find_compression_algorithm_states_bitset(a, &states_arg)) {
- return *states_arg;
- } else {
- return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */
- }
+ if (find_compression_algorithm_states_bitset (a, &states_arg))
+ {
+ return *states_arg;
+ }
+ else
+ {
+ return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */
+ }
}
diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h
index f9e7b05860..bcfd8c46a0 100644
--- a/src/core/channel/channel_args.h
+++ b/src/core/channel/channel_args.h
@@ -38,34 +38,29 @@
#include <grpc/grpc.h>
/* Copy some arguments */
-grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src);
+grpc_channel_args *grpc_channel_args_copy (const grpc_channel_args * src);
/** Copy some arguments and add the to_add parameter in the end.
If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */
-grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
- const grpc_arg *to_add,
- size_t num_to_add);
+grpc_channel_args *grpc_channel_args_copy_and_add (const grpc_channel_args * src, const grpc_arg * to_add, size_t num_to_add);
/** Copy args from a then args from b into a new channel args */
-grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
- const grpc_channel_args *b);
+grpc_channel_args *grpc_channel_args_merge (const grpc_channel_args * a, const grpc_channel_args * b);
/** Destroy arguments created by grpc_channel_args_copy */
-void grpc_channel_args_destroy(grpc_channel_args *a);
+void grpc_channel_args_destroy (grpc_channel_args * a);
/** Reads census_enabled settings from channel args. Returns 1 if census_enabled
* is specified in channel args, otherwise returns 0. */
-int grpc_channel_args_is_census_enabled(const grpc_channel_args *a);
+int grpc_channel_args_is_census_enabled (const grpc_channel_args * a);
/** Returns the compression algorithm set in \a a. */
-grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
- const grpc_channel_args *a);
+grpc_compression_algorithm grpc_channel_args_get_compression_algorithm (const grpc_channel_args * a);
/** Returns a channel arg instance with compression enabled. If \a a is
* non-NULL, its args are copied. N.B. GRPC_COMPRESS_NONE disables compression
* for the channel. */
-grpc_channel_args *grpc_channel_args_set_compression_algorithm(
- grpc_channel_args *a, grpc_compression_algorithm algorithm);
+grpc_channel_args *grpc_channel_args_set_compression_algorithm (grpc_channel_args * a, grpc_compression_algorithm algorithm);
/** Sets the support for the given compression algorithm. By default, all
* compression algorithms are enabled. It's an error to disable an algorithm set
@@ -74,15 +69,13 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
* Returns an instance will the updated algorithm states. The \a a pointer is
* modified to point to the returned instance (which may be different from the
* input value of \a a). */
-grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
- grpc_channel_args **a, grpc_compression_algorithm algorithm, int enabled);
+grpc_channel_args *grpc_channel_args_compression_algorithm_set_state (grpc_channel_args ** a, grpc_compression_algorithm algorithm, int enabled);
/** Returns the bitset representing the support state (true for enabled, false
* for disabled) for compression algorithms.
*
* The i-th bit of the returned bitset corresponds to the i-th entry in the
* grpc_compression_algorithm enum. */
-int grpc_channel_args_compression_algorithm_get_states(
- const grpc_channel_args *a);
+int grpc_channel_args_compression_algorithm_get_states (const grpc_channel_args * a);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 47f1f8e828..aafc0ede9c 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -59,21 +59,20 @@ int grpc_trace_channel = 0;
#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \
(((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u))
-size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
- size_t filter_count) {
+size_t
+grpc_channel_stack_size (const grpc_channel_filter ** filters, size_t filter_count)
+{
/* always need the header, and size for the channel elements */
- size_t size =
- ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack)) +
- ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element));
+ size_t size = ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_channel_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE (filter_count * sizeof (grpc_channel_element));
size_t i;
- GPR_ASSERT((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0 &&
- "GPR_MAX_ALIGNMENT must be a power of two");
+ GPR_ASSERT ((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0 && "GPR_MAX_ALIGNMENT must be a power of two");
/* add the size for each filter */
- for (i = 0; i < filter_count; i++) {
- size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
- }
+ for (i = 0; i < filter_count; i++)
+ {
+ size += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_channel_data);
+ }
return size;
}
@@ -86,144 +85,142 @@ size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
((grpc_call_element *)((char *)(stk) + \
ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack))))
-grpc_channel_element *grpc_channel_stack_element(
- grpc_channel_stack *channel_stack, size_t index) {
- return CHANNEL_ELEMS_FROM_STACK(channel_stack) + index;
+grpc_channel_element *
+grpc_channel_stack_element (grpc_channel_stack * channel_stack, size_t index)
+{
+ return CHANNEL_ELEMS_FROM_STACK (channel_stack) + index;
}
-grpc_channel_element *grpc_channel_stack_last_element(
- grpc_channel_stack *channel_stack) {
- return grpc_channel_stack_element(channel_stack, channel_stack->count - 1);
+grpc_channel_element *
+grpc_channel_stack_last_element (grpc_channel_stack * channel_stack)
+{
+ return grpc_channel_stack_element (channel_stack, channel_stack->count - 1);
}
-grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack,
- size_t index) {
- return CALL_ELEMS_FROM_STACK(call_stack) + index;
+grpc_call_element *
+grpc_call_stack_element (grpc_call_stack * call_stack, size_t index)
+{
+ return CALL_ELEMS_FROM_STACK (call_stack) + index;
}
-void grpc_channel_stack_init(const grpc_channel_filter **filters,
- size_t filter_count, grpc_channel *master,
- const grpc_channel_args *args,
- grpc_mdctx *metadata_context,
- grpc_channel_stack *stack,
- grpc_closure_list *closure_list) {
- size_t call_size =
- ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
- ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));
+void
+grpc_channel_stack_init (const grpc_channel_filter ** filters, size_t filter_count, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, grpc_channel_stack * stack, grpc_closure_list * closure_list)
+{
+ size_t call_size = ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_call_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE (filter_count * sizeof (grpc_call_element));
grpc_channel_element *elems;
char *user_data;
size_t i;
stack->count = filter_count;
- elems = CHANNEL_ELEMS_FROM_STACK(stack);
- user_data =
- ((char *)elems) +
- ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element));
+ elems = CHANNEL_ELEMS_FROM_STACK (stack);
+ user_data = ((char *) elems) + ROUND_UP_TO_ALIGNMENT_SIZE (filter_count * sizeof (grpc_channel_element));
/* init per-filter data */
- for (i = 0; i < filter_count; i++) {
- elems[i].filter = filters[i];
- elems[i].channel_data = user_data;
- elems[i].filter->init_channel_elem(&elems[i], master, args,
- metadata_context, i == 0,
- i == (filter_count - 1), closure_list);
- user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
- call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data);
- }
-
- GPR_ASSERT(user_data > (char *)stack);
- GPR_ASSERT((gpr_uintptr)(user_data - (char *)stack) ==
- grpc_channel_stack_size(filters, filter_count));
+ for (i = 0; i < filter_count; i++)
+ {
+ elems[i].filter = filters[i];
+ elems[i].channel_data = user_data;
+ elems[i].filter->init_channel_elem (&elems[i], master, args, metadata_context, i == 0, i == (filter_count - 1), closure_list);
+ user_data += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_channel_data);
+ call_size += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_call_data);
+ }
+
+ GPR_ASSERT (user_data > (char *) stack);
+ GPR_ASSERT ((gpr_uintptr) (user_data - (char *) stack) == grpc_channel_stack_size (filters, filter_count));
stack->call_stack_size = call_size;
}
-void grpc_channel_stack_destroy(grpc_channel_stack *stack,
- grpc_closure_list *closure_list) {
- grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(stack);
+void
+grpc_channel_stack_destroy (grpc_channel_stack * stack, grpc_closure_list * closure_list)
+{
+ grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK (stack);
size_t count = stack->count;
size_t i;
/* destroy per-filter data */
- for (i = 0; i < count; i++) {
- channel_elems[i].filter->destroy_channel_elem(&channel_elems[i],
- closure_list);
- }
+ for (i = 0; i < count; i++)
+ {
+ channel_elems[i].filter->destroy_channel_elem (&channel_elems[i], closure_list);
+ }
}
-void grpc_call_stack_init(grpc_channel_stack *channel_stack,
- const void *transport_server_data,
- grpc_transport_stream_op *initial_op,
- grpc_call_stack *call_stack,
- grpc_closure_list *closure_list) {
- grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
+void
+grpc_call_stack_init (grpc_channel_stack * channel_stack, const void *transport_server_data, grpc_transport_stream_op * initial_op, grpc_call_stack * call_stack, grpc_closure_list * closure_list)
+{
+ grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK (channel_stack);
size_t count = channel_stack->count;
grpc_call_element *call_elems;
char *user_data;
size_t i;
call_stack->count = count;
- call_elems = CALL_ELEMS_FROM_STACK(call_stack);
- user_data = ((char *)call_elems) +
- ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
+ call_elems = CALL_ELEMS_FROM_STACK (call_stack);
+ user_data = ((char *) call_elems) + ROUND_UP_TO_ALIGNMENT_SIZE (count * sizeof (grpc_call_element));
/* init per-filter data */
- for (i = 0; i < count; i++) {
- call_elems[i].filter = channel_elems[i].filter;
- call_elems[i].channel_data = channel_elems[i].channel_data;
- call_elems[i].call_data = user_data;
- call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data,
- initial_op, closure_list);
- user_data +=
- ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
- }
+ for (i = 0; i < count; i++)
+ {
+ call_elems[i].filter = channel_elems[i].filter;
+ call_elems[i].channel_data = channel_elems[i].channel_data;
+ call_elems[i].call_data = user_data;
+ call_elems[i].filter->init_call_elem (&call_elems[i], transport_server_data, initial_op, closure_list);
+ user_data += ROUND_UP_TO_ALIGNMENT_SIZE (call_elems[i].filter->sizeof_call_data);
+ }
}
-void grpc_call_stack_destroy(grpc_call_stack *stack,
- grpc_closure_list *closure_list) {
- grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
+void
+grpc_call_stack_destroy (grpc_call_stack * stack, grpc_closure_list * closure_list)
+{
+ grpc_call_element *elems = CALL_ELEMS_FROM_STACK (stack);
size_t count = stack->count;
size_t i;
/* destroy per-filter data */
- for (i = 0; i < count; i++) {
- elems[i].filter->destroy_call_elem(&elems[i], closure_list);
- }
+ for (i = 0; i < count; i++)
+ {
+ elems[i].filter->destroy_call_elem (&elems[i], closure_list);
+ }
}
-void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op,
- grpc_closure_list *closure_list) {
+void
+grpc_call_next_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list)
+{
grpc_call_element *next_elem = elem + 1;
- next_elem->filter->start_transport_stream_op(next_elem, op, closure_list);
+ next_elem->filter->start_transport_stream_op (next_elem, op, closure_list);
}
-char *grpc_call_next_get_peer(grpc_call_element *elem,
- grpc_closure_list *closure_list) {
+char *
+grpc_call_next_get_peer (grpc_call_element * elem, grpc_closure_list * closure_list)
+{
grpc_call_element *next_elem = elem + 1;
- return next_elem->filter->get_peer(next_elem, closure_list);
+ return next_elem->filter->get_peer (next_elem, closure_list);
}
-void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op,
- grpc_closure_list *closure_list) {
+void
+grpc_channel_next_op (grpc_channel_element * elem, grpc_transport_op * op, grpc_closure_list * closure_list)
+{
grpc_channel_element *next_elem = elem + 1;
- next_elem->filter->start_transport_op(next_elem, op, closure_list);
+ next_elem->filter->start_transport_op (next_elem, op, closure_list);
}
-grpc_channel_stack *grpc_channel_stack_from_top_element(
- grpc_channel_element *elem) {
- return (grpc_channel_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
- sizeof(grpc_channel_stack)));
+grpc_channel_stack *
+grpc_channel_stack_from_top_element (grpc_channel_element * elem)
+{
+ return (grpc_channel_stack *) ((char *) (elem) - ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_channel_stack)));
}
-grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
- return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
- sizeof(grpc_call_stack)));
+grpc_call_stack *
+grpc_call_stack_from_top_element (grpc_call_element * elem)
+{
+ return (grpc_call_stack *) ((char *) (elem) - ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_call_stack)));
}
-void grpc_call_element_send_cancel(grpc_call_element *cur_elem,
- grpc_closure_list *closure_list) {
+void
+grpc_call_element_send_cancel (grpc_call_element * cur_elem, grpc_closure_list * closure_list)
+{
grpc_transport_stream_op op;
- memset(&op, 0, sizeof(op));
+ memset (&op, 0, sizeof (op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
- grpc_call_next_op(cur_elem, &op, closure_list);
+ grpc_call_next_op (cur_elem, &op, closure_list);
}
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index b3facd4f01..937bb04eda 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -61,17 +61,15 @@ typedef struct grpc_call_element grpc_call_element;
4. a name, which is useful when debugging
Members are laid out in approximate frequency of use order. */
-typedef struct {
+typedef struct
+{
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
- void (*start_transport_stream_op)(grpc_call_element *elem,
- grpc_transport_stream_op *op,
- grpc_closure_list *closure_list);
+ void (*start_transport_stream_op) (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
- void (*start_transport_op)(grpc_channel_element *elem, grpc_transport_op *op,
- grpc_closure_list *closure_list);
+ void (*start_transport_op) (grpc_channel_element * elem, grpc_transport_op * op, grpc_closure_list * closure_list);
/* sizeof(per call data) */
size_t sizeof_call_data;
@@ -82,15 +80,11 @@ typedef struct {
server_transport_data is an opaque pointer. If it is NULL, this call is
on a client; if it is non-NULL, then it points to memory owned by the
transport and is on the server. Most filters want to ignore this
- argument.*/
- void (*init_call_elem)(grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op,
- grpc_closure_list *closure_list);
+ argument. */
+ void (*init_call_elem) (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list);
/* Destroy per call data.
The filter does not need to do any chaining */
- void (*destroy_call_elem)(grpc_call_element *elem,
- grpc_closure_list *closure_list);
+ void (*destroy_call_elem) (grpc_call_element * elem, grpc_closure_list * closure_list);
/* sizeof(per channel data) */
size_t sizeof_channel_data;
@@ -100,17 +94,13 @@ typedef struct {
is_first, is_last designate this elements position in the stack, and are
useful for asserting correct configuration by upper layer code.
The filter does not need to do any chaining */
- void (*init_channel_elem)(grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args,
- grpc_mdctx *metadata_context, int is_first,
- int is_last, grpc_closure_list *closure_list);
+ void (*init_channel_elem) (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, int is_first, int is_last, grpc_closure_list * closure_list);
/* Destroy per channel data.
The filter does not need to do any chaining */
- void (*destroy_channel_elem)(grpc_channel_element *elem,
- grpc_closure_list *closure_list);
+ void (*destroy_channel_elem) (grpc_channel_element * elem, grpc_closure_list * closure_list);
/* Implement grpc_call_get_peer() */
- char *(*get_peer)(grpc_call_element *elem, grpc_closure_list *closure_list);
+ char *(*get_peer) (grpc_call_element * elem, grpc_closure_list * closure_list);
/* The name of this filter */
const char *name;
@@ -118,7 +108,8 @@ typedef struct {
/* A channel_element tracks its filter and the filter requested memory within
a channel allocation */
-struct grpc_channel_element {
+struct grpc_channel_element
+{
const grpc_channel_filter *filter;
void *channel_data;
};
@@ -126,7 +117,8 @@ struct grpc_channel_element {
/* A call_element tracks its filter, the filter requested memory within
a channel allocation, and the filter requested memory within a call
allocation */
-struct grpc_call_element {
+struct grpc_call_element
+{
const grpc_channel_filter *filter;
void *channel_data;
void *call_data;
@@ -134,7 +126,8 @@ struct grpc_call_element {
/* A channel stack tracks a set of related filters for one channel, and
guarantees they live within a single malloc() allocation */
-typedef struct {
+typedef struct
+{
size_t count;
/* Memory required for a call stack (computed at channel stack
initialization) */
@@ -143,65 +136,48 @@ typedef struct {
/* A call stack tracks a set of related filters for one call, and guarantees
they live within a single malloc() allocation */
-typedef struct { size_t count; } grpc_call_stack;
+typedef struct
+{
+ size_t count;
+} grpc_call_stack;
/* Get a channel element given a channel stack and its index */
-grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack,
- size_t i);
+grpc_channel_element *grpc_channel_stack_element (grpc_channel_stack * stack, size_t i);
/* Get the last channel element in a channel stack */
-grpc_channel_element *grpc_channel_stack_last_element(
- grpc_channel_stack *stack);
+grpc_channel_element *grpc_channel_stack_last_element (grpc_channel_stack * stack);
/* Get a call stack element given a call stack and an index */
-grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i);
+grpc_call_element *grpc_call_stack_element (grpc_call_stack * stack, size_t i);
/* Determine memory required for a channel stack containing a set of filters */
-size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
- size_t filter_count);
+size_t grpc_channel_stack_size (const grpc_channel_filter ** filters, size_t filter_count);
/* Initialize a channel stack given some filters */
-void grpc_channel_stack_init(const grpc_channel_filter **filters,
- size_t filter_count, grpc_channel *master,
- const grpc_channel_args *args,
- grpc_mdctx *metadata_context,
- grpc_channel_stack *stack,
- grpc_closure_list *closure_list);
+void grpc_channel_stack_init (const grpc_channel_filter ** filters, size_t filter_count, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, grpc_channel_stack * stack, grpc_closure_list * closure_list);
/* Destroy a channel stack */
-void grpc_channel_stack_destroy(grpc_channel_stack *stack,
- grpc_closure_list *closure_list);
+void grpc_channel_stack_destroy (grpc_channel_stack * stack, grpc_closure_list * closure_list);
/* Initialize a call stack given a channel stack. transport_server_data is
expected to be NULL on a client, or an opaque transport owned pointer on the
server. */
-void grpc_call_stack_init(grpc_channel_stack *channel_stack,
- const void *transport_server_data,
- grpc_transport_stream_op *initial_op,
- grpc_call_stack *call_stack,
- grpc_closure_list *closure_list);
+void grpc_call_stack_init (grpc_channel_stack * channel_stack, const void *transport_server_data, grpc_transport_stream_op * initial_op, grpc_call_stack * call_stack, grpc_closure_list * closure_list);
/* Destroy a call stack */
-void grpc_call_stack_destroy(grpc_call_stack *stack,
- grpc_closure_list *closure_list);
+void grpc_call_stack_destroy (grpc_call_stack * stack, grpc_closure_list * closure_list);
/* Call the next operation in a call stack */
-void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op,
- grpc_closure_list *closure_list);
+void grpc_call_next_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list);
/* Call the next operation (depending on call directionality) in a channel
stack */
-void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op,
- grpc_closure_list *closure_list);
+void grpc_channel_next_op (grpc_channel_element * elem, grpc_transport_op * op, grpc_closure_list * closure_list);
/* Pass through a request to get_peer to the next child element */
-char *grpc_call_next_get_peer(grpc_call_element *elem,
- grpc_closure_list *closure_list);
+char *grpc_call_next_get_peer (grpc_call_element * elem, grpc_closure_list * closure_list);
/* Given the top element of a channel stack, get the channel stack itself */
-grpc_channel_stack *grpc_channel_stack_from_top_element(
- grpc_channel_element *elem);
+grpc_channel_stack *grpc_channel_stack_from_top_element (grpc_channel_element * elem);
/* Given the top element of a call stack, get the call stack itself */
-grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
+grpc_call_stack *grpc_call_stack_from_top_element (grpc_call_element * elem);
-void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
- grpc_call_element *elem, grpc_transport_stream_op *op);
+void grpc_call_log_op (char *file, int line, gpr_log_severity severity, grpc_call_element * elem, grpc_transport_stream_op * op);
-void grpc_call_element_send_cancel(grpc_call_element *cur_elem,
- grpc_closure_list *closure_list);
+void grpc_call_element_send_cancel (grpc_call_element * cur_elem, grpc_closure_list * closure_list);
extern int grpc_trace_channel;
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 903f9eab50..40b428cf3e 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -51,7 +51,8 @@
typedef struct call_data call_data;
-typedef struct {
+typedef struct
+{
/** metadata context for this channel */
grpc_mdctx *mdctx;
/** resolver for this channel */
@@ -89,14 +90,16 @@ typedef struct {
to watch for state changes from the lb_policy. When a state change is seen,
we
update the channel, and create a new watcher */
-typedef struct {
+typedef struct
+{
channel_data *chand;
grpc_closure on_changed;
grpc_connectivity_state state;
grpc_lb_policy *lb_policy;
} lb_policy_connectivity_watcher;
-typedef enum {
+typedef enum
+{
CALL_CREATED,
CALL_WAITING_FOR_SEND,
CALL_WAITING_FOR_CONFIG,
@@ -106,7 +109,8 @@ typedef enum {
CALL_CANCELLED
} call_state;
-struct call_data {
+struct call_data
+{
/* owning element */
grpc_call_element *elem;
@@ -123,367 +127,406 @@ struct call_data {
grpc_linked_mdelem details;
};
-static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
- grpc_transport_stream_op *new_op)
- GRPC_MUST_USE_RESULT;
+static grpc_closure *
+merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_op)
+ GRPC_MUST_USE_RESULT;
-static void handle_op_after_cancellation(grpc_call_element *elem,
- grpc_transport_stream_op *op,
- grpc_closure_list *closure_list) {
+ static void handle_op_after_cancellation (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list)
+{
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (op->send_ops) {
- grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
- op->on_done_send->cb(op->on_done_send->cb_arg, 0, closure_list);
- }
- if (op->recv_ops) {
- char status[GPR_LTOA_MIN_BUFSIZE];
- grpc_metadata_batch mdb;
- gpr_ltoa(GRPC_STATUS_CANCELLED, status);
- calld->status.md =
- grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
- calld->details.md =
- grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
- calld->status.prev = calld->details.next = NULL;
- calld->status.next = &calld->details;
- calld->details.prev = &calld->status;
- mdb.list.head = &calld->status;
- mdb.list.tail = &calld->details;
- mdb.garbage.head = mdb.garbage.tail = NULL;
- mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
- grpc_sopb_add_metadata(op->recv_ops, mdb);
- *op->recv_state = GRPC_STREAM_CLOSED;
- op->on_done_recv->cb(op->on_done_recv->cb_arg, 1, closure_list);
- }
- if (op->on_consumed) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 0, closure_list);
- }
+ if (op->send_ops)
+ {
+ grpc_stream_ops_unref_owned_objects (op->send_ops->ops, op->send_ops->nops);
+ op->on_done_send->cb (op->on_done_send->cb_arg, 0, closure_list);
+ }
+ if (op->recv_ops)
+ {
+ char status[GPR_LTOA_MIN_BUFSIZE];
+ grpc_metadata_batch mdb;
+ gpr_ltoa (GRPC_STATUS_CANCELLED, status);
+ calld->status.md = grpc_mdelem_from_strings (chand->mdctx, "grpc-status", status);
+ calld->details.md = grpc_mdelem_from_strings (chand->mdctx, "grpc-message", "Cancelled");
+ calld->status.prev = calld->details.next = NULL;
+ calld->status.next = &calld->details;
+ calld->details.prev = &calld->status;
+ mdb.list.head = &calld->status;
+ mdb.list.tail = &calld->details;
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = gpr_inf_future (GPR_CLOCK_REALTIME);
+ grpc_sopb_add_metadata (op->recv_ops, mdb);
+ *op->recv_state = GRPC_STREAM_CLOSED;
+ op->on_done_recv->cb (op->on_done_recv->cb_arg, 1, closure_list);
+ }
+ if (op->on_consumed)
+ {
+ op->on_consumed->cb (op->on_consumed->cb_arg, 0, closure_list);
+ }
}
-typedef struct {
+typedef struct
+{
grpc_closure closure;
grpc_call_element *elem;
} waiting_call;
-static void perform_transport_stream_op(grpc_call_element *elem,
- grpc_transport_stream_op *op,
- int continuation,
- grpc_closure_list *closure_list);
+static void perform_transport_stream_op (grpc_call_element * elem, grpc_transport_stream_op * op, int continuation, grpc_closure_list * closure_list);
-static void continue_with_pick(void *arg, int iomgr_success,
- grpc_closure_list *closure_list) {
+static void
+continue_with_pick (void *arg, int iomgr_success, grpc_closure_list * closure_list)
+{
waiting_call *wc = arg;
call_data *calld = wc->elem->call_data;
- perform_transport_stream_op(wc->elem, &calld->waiting_op, 1, closure_list);
- gpr_free(wc);
+ perform_transport_stream_op (wc->elem, &calld->waiting_op, 1, closure_list);
+ gpr_free (wc);
}
-static void add_to_lb_policy_wait_queue_locked_state_config(
- grpc_call_element *elem) {
+static void
+add_to_lb_policy_wait_queue_locked_state_config (grpc_call_element * elem)
+{
channel_data *chand = elem->channel_data;
- waiting_call *wc = gpr_malloc(sizeof(*wc));
- grpc_closure_init(&wc->closure, continue_with_pick, wc);
+ waiting_call *wc = gpr_malloc (sizeof (*wc));
+ grpc_closure_init (&wc->closure, continue_with_pick, wc);
wc->elem = elem;
- grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1);
+ grpc_closure_list_add (&chand->waiting_for_config_closures, &wc->closure, 1);
}
-static int is_empty(void *p, int len) {
+static int
+is_empty (void *p, int len)
+{
char *ptr = p;
int i;
- for (i = 0; i < len; i++) {
- if (ptr[i] != 0) return 0;
- }
+ for (i = 0; i < len; i++)
+ {
+ if (ptr[i] != 0)
+ return 0;
+ }
return 1;
}
-static void started_call(void *arg, int iomgr_success,
- grpc_closure_list *closure_list) {
+static void
+started_call (void *arg, int iomgr_success, grpc_closure_list * closure_list)
+{
call_data *calld = arg;
grpc_transport_stream_op op;
int have_waiting;
- gpr_mu_lock(&calld->mu_state);
- if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
- memset(&op, 0, sizeof(op));
- op.cancel_with_status = GRPC_STATUS_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- grpc_subchannel_call_process_op(calld->subchannel_call, &op, closure_list);
- } else if (calld->state == CALL_WAITING_FOR_CALL) {
- have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
- if (calld->subchannel_call != NULL) {
- calld->state = CALL_ACTIVE;
- gpr_mu_unlock(&calld->mu_state);
- if (have_waiting) {
- grpc_subchannel_call_process_op(calld->subchannel_call,
- &calld->waiting_op, closure_list);
- }
- } else {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- if (have_waiting) {
- handle_op_after_cancellation(calld->elem, &calld->waiting_op,
- closure_list);
- }
- }
- } else {
- GPR_ASSERT(calld->state == CALL_CANCELLED);
- gpr_mu_unlock(&calld->mu_state);
- }
+ gpr_mu_lock (&calld->mu_state);
+ if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL)
+ {
+ memset (&op, 0, sizeof (op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ gpr_mu_unlock (&calld->mu_state);
+ grpc_subchannel_call_process_op (calld->subchannel_call, &op, closure_list);
+ }
+ else if (calld->state == CALL_WAITING_FOR_CALL)
+ {
+ have_waiting = !is_empty (&calld->waiting_op, sizeof (calld->waiting_op));
+ if (calld->subchannel_call != NULL)
+ {
+ calld->state = CALL_ACTIVE;
+ gpr_mu_unlock (&calld->mu_state);
+ if (have_waiting)
+ {
+ grpc_subchannel_call_process_op (calld->subchannel_call, &calld->waiting_op, closure_list);
+ }
+ }
+ else
+ {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock (&calld->mu_state);
+ if (have_waiting)
+ {
+ handle_op_after_cancellation (calld->elem, &calld->waiting_op, closure_list);
+ }
+ }
+ }
+ else
+ {
+ GPR_ASSERT (calld->state == CALL_CANCELLED);
+ gpr_mu_unlock (&calld->mu_state);
+ }
}
-static void picked_target(void *arg, int iomgr_success,
- grpc_closure_list *closure_list) {
+static void
+picked_target (void *arg, int iomgr_success, grpc_closure_list * closure_list)
+{
call_data *calld = arg;
grpc_pollset *pollset;
- if (calld->picked_channel == NULL) {
- /* treat this like a cancellation */
- calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
- perform_transport_stream_op(calld->elem, &calld->waiting_op, 1,
- closure_list);
- } else {
- gpr_mu_lock(&calld->mu_state);
- if (calld->state == CALL_CANCELLED) {
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(calld->elem, &calld->waiting_op,
- closure_list);
- } else {
- GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
- calld->state = CALL_WAITING_FOR_CALL;
- pollset = calld->waiting_op.bind_pollset;
- gpr_mu_unlock(&calld->mu_state);
- grpc_closure_init(&calld->async_setup_task, started_call, calld);
- grpc_subchannel_create_call(calld->picked_channel, pollset,
- &calld->subchannel_call,
- &calld->async_setup_task, closure_list);
- }
- }
+ if (calld->picked_channel == NULL)
+ {
+ /* treat this like a cancellation */
+ calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
+ perform_transport_stream_op (calld->elem, &calld->waiting_op, 1, closure_list);
+ }
+ else
+ {
+ gpr_mu_lock (&calld->mu_state);
+ if (calld->state == CALL_CANCELLED)
+ {
+ gpr_mu_unlock (&calld->mu_state);
+ handle_op_after_cancellation (calld->elem, &calld->waiting_op, closure_list);
+ }
+ else
+ {
+ GPR_ASSERT (calld->state == CALL_WAITING_FOR_PICK);
+ calld->state = CALL_WAITING_FOR_CALL;
+ pollset = calld->waiting_op.bind_pollset;
+ gpr_mu_unlock (&calld->mu_state);
+ grpc_closure_init (&calld->async_setup_task, started_call, calld);
+ grpc_subchannel_create_call (calld->picked_channel, pollset, &calld->subchannel_call, &calld->async_setup_task, closure_list);
+ }
+ }
}
-static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
- grpc_transport_stream_op *new_op) {
+static grpc_closure *
+merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_op)
+{
call_data *calld = elem->call_data;
grpc_closure *consumed_op = NULL;
grpc_transport_stream_op *waiting_op = &calld->waiting_op;
- GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
- GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
- if (new_op->send_ops != NULL) {
- waiting_op->send_ops = new_op->send_ops;
- waiting_op->is_last_send = new_op->is_last_send;
- waiting_op->on_done_send = new_op->on_done_send;
- }
- if (new_op->recv_ops != NULL) {
- waiting_op->recv_ops = new_op->recv_ops;
- waiting_op->recv_state = new_op->recv_state;
- waiting_op->on_done_recv = new_op->on_done_recv;
- }
- if (new_op->on_consumed != NULL) {
- if (waiting_op->on_consumed != NULL) {
- consumed_op = waiting_op->on_consumed;
- }
- waiting_op->on_consumed = new_op->on_consumed;
- }
- if (new_op->cancel_with_status != GRPC_STATUS_OK) {
- waiting_op->cancel_with_status = new_op->cancel_with_status;
- }
+ GPR_ASSERT ((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
+ GPR_ASSERT ((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
+ if (new_op->send_ops != NULL)
+ {
+ waiting_op->send_ops = new_op->send_ops;
+ waiting_op->is_last_send = new_op->is_last_send;
+ waiting_op->on_done_send = new_op->on_done_send;
+ }
+ if (new_op->recv_ops != NULL)
+ {
+ waiting_op->recv_ops = new_op->recv_ops;
+ waiting_op->recv_state = new_op->recv_state;
+ waiting_op->on_done_recv = new_op->on_done_recv;
+ }
+ if (new_op->on_consumed != NULL)
+ {
+ if (waiting_op->on_consumed != NULL)
+ {
+ consumed_op = waiting_op->on_consumed;
+ }
+ waiting_op->on_consumed = new_op->on_consumed;
+ }
+ if (new_op->cancel_with_status != GRPC_STATUS_OK)
+ {
+ waiting_op->cancel_with_status = new_op->cancel_with_status;
+ }
return consumed_op;
}
-static char *cc_get_peer(grpc_call_element *elem,
- grpc_closure_list *closure_list) {
+static char *
+cc_get_peer (grpc_call_element * elem, grpc_closure_list * closure_list)
+{
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
char *result;
- gpr_mu_lock(&calld->mu_state);
- if (calld->state == CALL_ACTIVE) {
- subchannel_call = calld->subchannel_call;
- GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
- gpr_mu_unlock(&calld->mu_state);
- result = grpc_subchannel_call_get_peer(subchannel_call, closure_list);
- GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "get_peer", closure_list);
- return result;
- } else {
- gpr_mu_unlock(&calld->mu_state);
- return grpc_channel_get_target(chand->master);
- }
+ gpr_mu_lock (&calld->mu_state);
+ if (calld->state == CALL_ACTIVE)
+ {
+ subchannel_call = calld->subchannel_call;
+ GRPC_SUBCHANNEL_CALL_REF (subchannel_call, "get_peer");
+ gpr_mu_unlock (&calld->mu_state);
+ result = grpc_subchannel_call_get_peer (subchannel_call, closure_list);
+ GRPC_SUBCHANNEL_CALL_UNREF (subchannel_call, "get_peer", closure_list);
+ return result;
+ }
+ else
+ {
+ gpr_mu_unlock (&calld->mu_state);
+ return grpc_channel_get_target (chand->master);
+ }
}
-static void perform_transport_stream_op(grpc_call_element *elem,
- grpc_transport_stream_op *op,
- int continuation,
- grpc_closure_list *closure_list) {
+static void
+perform_transport_stream_op (grpc_call_element * elem, grpc_transport_stream_op * op, int continuation, grpc_closure_list * closure_list)
+{
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
grpc_transport_stream_op op2;
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ GPR_ASSERT (elem->filter == &grpc_client_channel_filter);
+ GRPC_CALL_LOG_OP (GPR_INFO, elem, op);
- gpr_mu_lock(&calld->mu_state);
- switch (calld->state) {
+ gpr_mu_lock (&calld->mu_state);
+ switch (calld->state)
+ {
case CALL_ACTIVE:
- GPR_ASSERT(!continuation);
+ GPR_ASSERT (!continuation);
subchannel_call = calld->subchannel_call;
- gpr_mu_unlock(&calld->mu_state);
- grpc_subchannel_call_process_op(subchannel_call, op, closure_list);
+ gpr_mu_unlock (&calld->mu_state);
+ grpc_subchannel_call_process_op (subchannel_call, op, closure_list);
break;
case CALL_CANCELLED:
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(elem, op, closure_list);
+ gpr_mu_unlock (&calld->mu_state);
+ handle_op_after_cancellation (elem, op, closure_list);
break;
case CALL_WAITING_FOR_SEND:
- GPR_ASSERT(!continuation);
- grpc_closure_list_add(closure_list, merge_into_waiting_op(elem, op), 1);
- if (!calld->waiting_op.send_ops &&
- calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
- gpr_mu_unlock(&calld->mu_state);
- break;
- }
+ GPR_ASSERT (!continuation);
+ grpc_closure_list_add (closure_list, merge_into_waiting_op (elem, op), 1);
+ if (!calld->waiting_op.send_ops && calld->waiting_op.cancel_with_status == GRPC_STATUS_OK)
+ {
+ gpr_mu_unlock (&calld->mu_state);
+ break;
+ }
*op = calld->waiting_op;
- memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
+ memset (&calld->waiting_op, 0, sizeof (calld->waiting_op));
continuation = 1;
- /* fall through */
+ /* fall through */
case CALL_WAITING_FOR_CONFIG:
case CALL_WAITING_FOR_PICK:
case CALL_WAITING_FOR_CALL:
- if (!continuation) {
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- op2 = calld->waiting_op;
- memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
- if (op->on_consumed) {
- calld->waiting_op.on_consumed = op->on_consumed;
- op->on_consumed = NULL;
- } else if (op2.on_consumed) {
- calld->waiting_op.on_consumed = op2.on_consumed;
- op2.on_consumed = NULL;
- }
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(elem, op, closure_list);
- handle_op_after_cancellation(elem, &op2, closure_list);
- } else {
- grpc_closure_list_add(closure_list, merge_into_waiting_op(elem, op),
- 1);
- gpr_mu_unlock(&calld->mu_state);
- }
- break;
- }
- /* fall through */
+ if (!continuation)
+ {
+ if (op->cancel_with_status != GRPC_STATUS_OK)
+ {
+ calld->state = CALL_CANCELLED;
+ op2 = calld->waiting_op;
+ memset (&calld->waiting_op, 0, sizeof (calld->waiting_op));
+ if (op->on_consumed)
+ {
+ calld->waiting_op.on_consumed = op->on_consumed;
+ op->on_consumed = NULL;
+ }
+ else if (op2.on_consumed)
+ {
+ calld->waiting_op.on_consumed = op2.on_consumed;
+ op2.on_consumed = NULL;
+ }
+ gpr_mu_unlock (&calld->mu_state);
+ handle_op_after_cancellation (elem, op, closure_list);
+ handle_op_after_cancellation (elem, &op2, closure_list);
+ }
+ else
+ {
+ grpc_closure_list_add (closure_list, merge_into_waiting_op (elem, op), 1);
+ gpr_mu_unlock (&calld->mu_state);
+ }
+ break;
+ }
+ /* fall through */
case CALL_CREATED:
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(elem, op, closure_list);
- } else {
- calld->waiting_op = *op;
-
- if (op->send_ops == NULL) {
- /* need to have some send ops before we can select the
- lb target */
- calld->state = CALL_WAITING_FOR_SEND;
- gpr_mu_unlock(&calld->mu_state);
- } else {
- gpr_mu_lock(&chand->mu_config);
- lb_policy = chand->lb_policy;
- if (lb_policy) {
- grpc_transport_stream_op *op = &calld->waiting_op;
- grpc_pollset *bind_pollset = op->bind_pollset;
- grpc_metadata_batch *initial_metadata =
- &op->send_ops->ops[0].data.metadata;
- GRPC_LB_POLICY_REF(lb_policy, "pick");
- gpr_mu_unlock(&chand->mu_config);
- calld->state = CALL_WAITING_FOR_PICK;
-
- GPR_ASSERT(op->bind_pollset);
- GPR_ASSERT(op->send_ops);
- GPR_ASSERT(op->send_ops->nops >= 1);
- GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
- gpr_mu_unlock(&calld->mu_state);
-
- grpc_closure_init(&calld->async_setup_task, picked_target, calld);
- grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
- &calld->picked_channel,
- &calld->async_setup_task, closure_list);
-
- GRPC_LB_POLICY_UNREF(lb_policy, "pick", closure_list);
- } else if (chand->resolver != NULL) {
- calld->state = CALL_WAITING_FOR_CONFIG;
- add_to_lb_policy_wait_queue_locked_state_config(elem);
- if (!chand->started_resolving && chand->resolver != NULL) {
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
- chand->started_resolving = 1;
- grpc_resolver_next(chand->resolver,
- &chand->incoming_configuration,
- &chand->on_config_changed, closure_list);
- }
- gpr_mu_unlock(&chand->mu_config);
- gpr_mu_unlock(&calld->mu_state);
- } else {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu_config);
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(elem, op, closure_list);
- }
- }
- }
+ if (op->cancel_with_status != GRPC_STATUS_OK)
+ {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock (&calld->mu_state);
+ handle_op_after_cancellation (elem, op, closure_list);
+ }
+ else
+ {
+ calld->waiting_op = *op;
+
+ if (op->send_ops == NULL)
+ {
+ /* need to have some send ops before we can select the
+ lb target */
+ calld->state = CALL_WAITING_FOR_SEND;
+ gpr_mu_unlock (&calld->mu_state);
+ }
+ else
+ {
+ gpr_mu_lock (&chand->mu_config);
+ lb_policy = chand->lb_policy;
+ if (lb_policy)
+ {
+ grpc_transport_stream_op *op = &calld->waiting_op;
+ grpc_pollset *bind_pollset = op->bind_pollset;
+ grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata;
+ GRPC_LB_POLICY_REF (lb_policy, "pick");
+ gpr_mu_unlock (&chand->mu_config);
+ calld->state = CALL_WAITING_FOR_PICK;
+
+ GPR_ASSERT (op->bind_pollset);
+ GPR_ASSERT (op->send_ops);
+ GPR_ASSERT (op->send_ops->nops >= 1);
+ GPR_ASSERT (op->send_ops->ops[0].type == GRPC_OP_METADATA);
+ gpr_mu_unlock (&calld->mu_state);
+
+ grpc_closure_init (&calld->async_setup_task, picked_target, calld);
+ grpc_lb_policy_pick (lb_policy, bind_pollset, initial_metadata, &calld->picked_channel, &calld->async_setup_task, closure_list);
+
+ GRPC_LB_POLICY_UNREF (lb_policy, "pick", closure_list);
+ }
+ else if (chand->resolver != NULL)
+ {
+ calld->state = CALL_WAITING_FOR_CONFIG;
+ add_to_lb_policy_wait_queue_locked_state_config (elem);
+ if (!chand->started_resolving && chand->resolver != NULL)
+ {
+ GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver");
+ chand->started_resolving = 1;
+ grpc_resolver_next (chand->resolver, &chand->incoming_configuration, &chand->on_config_changed, closure_list);
+ }
+ gpr_mu_unlock (&chand->mu_config);
+ gpr_mu_unlock (&calld->mu_state);
+ }
+ else
+ {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock (&chand->mu_config);
+ gpr_mu_unlock (&calld->mu_state);
+ handle_op_after_cancellation (elem, op, closure_list);
+ }
+ }
+ }
break;
- }
+ }
}
-static void cc_start_transport_stream_op(grpc_call_element *elem,
- grpc_transport_stream_op *op,
- grpc_closure_list *closure_list) {
- perform_transport_stream_op(elem, op, 0, closure_list);
+static void
+cc_start_transport_stream_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list)
+{
+ perform_transport_stream_op (elem, op, 0, closure_list);
}
-static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
- grpc_connectivity_state current_state,
- grpc_closure_list *cl);
+static void watch_lb_policy (channel_data * chand, grpc_lb_policy * lb_policy, grpc_connectivity_state current_state, grpc_closure_list * cl);
-static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w,
- grpc_closure_list *cl) {
+static void
+on_lb_policy_state_changed_locked (lb_policy_connectivity_watcher * w, grpc_closure_list * cl)
+{
/* check if the notification is for a stale policy */
- if (w->lb_policy != w->chand->lb_policy) return;
+ if (w->lb_policy != w->chand->lb_policy)
+ return;
- grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed",
- cl);
- if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
- watch_lb_policy(w->chand, w->lb_policy, w->state, cl);
- }
+ grpc_connectivity_state_set (&w->chand->state_tracker, w->state, "lb_changed", cl);
+ if (w->state != GRPC_CHANNEL_FATAL_FAILURE)
+ {
+ watch_lb_policy (w->chand, w->lb_policy, w->state, cl);
+ }
}
-static void on_lb_policy_state_changed(void *arg, int iomgr_success,
- grpc_closure_list *closure_list) {
+static void
+on_lb_policy_state_changed (void *arg, int iomgr_success, grpc_closure_list * closure_list)
+{
lb_policy_connectivity_watcher *w = arg;
- gpr_mu_lock(&w->chand->mu_config);
- on_lb_policy_state_changed_locked(w, closure_list);
- gpr_mu_unlock(&w->chand->mu_config);
+ gpr_mu_lock (&w->chand->mu_config);
+ on_lb_policy_state_changed_locked (w, closure_list);
+ gpr_mu_unlock (&w->chand->mu_config);
- GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy",
- closure_list);
- gpr_free(w);
+ GRPC_CHANNEL_INTERNAL_UNREF (w->chand->master, "watch_lb_policy", closure_list);
+ gpr_free (w);
}
-static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
- grpc_connectivity_state current_state,
- grpc_closure_list *closure_list) {
- lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
+static void
+watch_lb_policy (channel_data * chand, grpc_lb_policy * lb_policy, grpc_connectivity_state current_state, grpc_closure_list * closure_list)
+{
+ lb_policy_connectivity_watcher *w = gpr_malloc (sizeof (*w));
+ GRPC_CHANNEL_INTERNAL_REF (chand->master, "watch_lb_policy");
w->chand = chand;
- grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
+ grpc_closure_init (&w->on_changed, on_lb_policy_state_changed, w);
w->state = current_state;
w->lb_policy = lb_policy;
- grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed,
- closure_list);
+ grpc_lb_policy_notify_on_state_change (lb_policy, &w->state, &w->on_changed, closure_list);
}
-static void cc_on_config_changed(void *arg, int iomgr_success,
- grpc_closure_list *closure_list) {
+static void
+cc_on_config_changed (void *arg, int iomgr_success, grpc_closure_list * closure_list)
+{
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
@@ -491,298 +534,313 @@ static void cc_on_config_changed(void *arg, int iomgr_success,
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
int exit_idle = 0;
- if (chand->incoming_configuration != NULL) {
- lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
- if (lb_policy != NULL) {
- GRPC_LB_POLICY_REF(lb_policy, "channel");
- GRPC_LB_POLICY_REF(lb_policy, "config_change");
- state = grpc_lb_policy_check_connectivity(lb_policy, closure_list);
+ if (chand->incoming_configuration != NULL)
+ {
+ lb_policy = grpc_client_config_get_lb_policy (chand->incoming_configuration);
+ if (lb_policy != NULL)
+ {
+ GRPC_LB_POLICY_REF (lb_policy, "channel");
+ GRPC_LB_POLICY_REF (lb_policy, "config_change");
+ state = grpc_lb_policy_check_connectivity (lb_policy, closure_list);
+ }
+
+ grpc_client_config_unref (chand->incoming_configuration, closure_list);
}
- grpc_client_config_unref(chand->incoming_configuration, closure_list);
- }
-
chand->incoming_configuration = NULL;
- gpr_mu_lock(&chand->mu_config);
+ gpr_mu_lock (&chand->mu_config);
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
- if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
- grpc_closure_list_move(&chand->waiting_for_config_closures, closure_list);
- }
- if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
- GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
- exit_idle = 1;
- chand->exit_idle_when_lb_policy_arrives = 0;
- }
-
- if (iomgr_success && chand->resolver) {
- grpc_resolver *resolver = chand->resolver;
- GRPC_RESOLVER_REF(resolver, "channel-next");
- grpc_connectivity_state_set(&chand->state_tracker, state, "new_lb+resolver",
- closure_list);
- if (lb_policy != NULL) {
- watch_lb_policy(chand, lb_policy, state, closure_list);
- }
- gpr_mu_unlock(&chand->mu_config);
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
- grpc_resolver_next(resolver, &chand->incoming_configuration,
- &chand->on_config_changed, closure_list);
- GRPC_RESOLVER_UNREF(resolver, "channel-next", closure_list);
- } else {
- old_resolver = chand->resolver;
- chand->resolver = NULL;
- grpc_connectivity_state_set(&chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone",
- closure_list);
- gpr_mu_unlock(&chand->mu_config);
- if (old_resolver != NULL) {
- grpc_resolver_shutdown(old_resolver, closure_list);
- GRPC_RESOLVER_UNREF(old_resolver, "channel", closure_list);
- }
- }
-
- if (exit_idle) {
- grpc_lb_policy_exit_idle(lb_policy, closure_list);
- GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle", closure_list);
- }
-
- if (old_lb_policy != NULL) {
- grpc_lb_policy_shutdown(old_lb_policy, closure_list);
- GRPC_LB_POLICY_UNREF(old_lb_policy, "channel", closure_list);
- }
-
- if (lb_policy != NULL) {
- GRPC_LB_POLICY_UNREF(lb_policy, "config_change", closure_list);
- }
-
- GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver", closure_list);
+ if (lb_policy != NULL || chand->resolver == NULL /* disconnected */ )
+ {
+ grpc_closure_list_move (&chand->waiting_for_config_closures, closure_list);
+ }
+ if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives)
+ {
+ GRPC_LB_POLICY_REF (lb_policy, "exit_idle");
+ exit_idle = 1;
+ chand->exit_idle_when_lb_policy_arrives = 0;
+ }
+
+ if (iomgr_success && chand->resolver)
+ {
+ grpc_resolver *resolver = chand->resolver;
+ GRPC_RESOLVER_REF (resolver, "channel-next");
+ grpc_connectivity_state_set (&chand->state_tracker, state, "new_lb+resolver", closure_list);
+ if (lb_policy != NULL)
+ {
+ watch_lb_policy (chand, lb_policy, state, closure_list);
+ }
+ gpr_mu_unlock (&chand->mu_config);
+ GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver");
+ grpc_resolver_next (resolver, &chand->incoming_configuration, &chand->on_config_changed, closure_list);
+ GRPC_RESOLVER_UNREF (resolver, "channel-next", closure_list);
+ }
+ else
+ {
+ old_resolver = chand->resolver;
+ chand->resolver = NULL;
+ grpc_connectivity_state_set (&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone", closure_list);
+ gpr_mu_unlock (&chand->mu_config);
+ if (old_resolver != NULL)
+ {
+ grpc_resolver_shutdown (old_resolver, closure_list);
+ GRPC_RESOLVER_UNREF (old_resolver, "channel", closure_list);
+ }
+ }
+
+ if (exit_idle)
+ {
+ grpc_lb_policy_exit_idle (lb_policy, closure_list);
+ GRPC_LB_POLICY_UNREF (lb_policy, "exit_idle", closure_list);
+ }
+
+ if (old_lb_policy != NULL)
+ {
+ grpc_lb_policy_shutdown (old_lb_policy, closure_list);
+ GRPC_LB_POLICY_UNREF (old_lb_policy, "channel", closure_list);
+ }
+
+ if (lb_policy != NULL)
+ {
+ GRPC_LB_POLICY_UNREF (lb_policy, "config_change", closure_list);
+ }
+
+ GRPC_CHANNEL_INTERNAL_UNREF (chand->master, "resolver", closure_list);
}
-static void cc_start_transport_op(grpc_channel_element *elem,
- grpc_transport_op *op,
- grpc_closure_list *closure_list) {
+static void
+cc_start_transport_op (grpc_channel_element * elem, grpc_transport_op * op, grpc_closure_list * closure_list)
+{
grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
- grpc_closure_list_add(closure_list, op->on_consumed, 1);
-
- GPR_ASSERT(op->set_accept_stream == NULL);
- GPR_ASSERT(op->bind_pollset == NULL);
-
- gpr_mu_lock(&chand->mu_config);
- if (op->on_connectivity_state_change != NULL) {
- grpc_connectivity_state_notify_on_state_change(
- &chand->state_tracker, op->connectivity_state,
- op->on_connectivity_state_change, closure_list);
- op->on_connectivity_state_change = NULL;
- op->connectivity_state = NULL;
- }
-
- if (!is_empty(op, sizeof(*op))) {
- lb_policy = chand->lb_policy;
- if (lb_policy) {
- GRPC_LB_POLICY_REF(lb_policy, "broadcast");
- }
- }
-
- if (op->disconnect && chand->resolver != NULL) {
- grpc_connectivity_state_set(&chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "disconnect",
- closure_list);
- destroy_resolver = chand->resolver;
- chand->resolver = NULL;
- if (chand->lb_policy != NULL) {
- grpc_lb_policy_shutdown(chand->lb_policy, closure_list);
- GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel", closure_list);
- chand->lb_policy = NULL;
- }
- }
- gpr_mu_unlock(&chand->mu_config);
-
- if (destroy_resolver) {
- grpc_resolver_shutdown(destroy_resolver, closure_list);
- GRPC_RESOLVER_UNREF(destroy_resolver, "channel", closure_list);
- }
-
- if (lb_policy) {
- grpc_lb_policy_broadcast(lb_policy, op, closure_list);
- GRPC_LB_POLICY_UNREF(lb_policy, "broadcast", closure_list);
- }
+ grpc_closure_list_add (closure_list, op->on_consumed, 1);
+
+ GPR_ASSERT (op->set_accept_stream == NULL);
+ GPR_ASSERT (op->bind_pollset == NULL);
+
+ gpr_mu_lock (&chand->mu_config);
+ if (op->on_connectivity_state_change != NULL)
+ {
+ grpc_connectivity_state_notify_on_state_change (&chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change, closure_list);
+ op->on_connectivity_state_change = NULL;
+ op->connectivity_state = NULL;
+ }
+
+ if (!is_empty (op, sizeof (*op)))
+ {
+ lb_policy = chand->lb_policy;
+ if (lb_policy)
+ {
+ GRPC_LB_POLICY_REF (lb_policy, "broadcast");
+ }
+ }
+
+ if (op->disconnect && chand->resolver != NULL)
+ {
+ grpc_connectivity_state_set (&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect", closure_list);
+ destroy_resolver = chand->resolver;
+ chand->resolver = NULL;
+ if (chand->lb_policy != NULL)
+ {
+ grpc_lb_policy_shutdown (chand->lb_policy, closure_list);
+ GRPC_LB_POLICY_UNREF (chand->lb_policy, "channel", closure_list);
+ chand->lb_policy = NULL;
+ }
+ }
+ gpr_mu_unlock (&chand->mu_config);
+
+ if (destroy_resolver)
+ {
+ grpc_resolver_shutdown (destroy_resolver, closure_list);
+ GRPC_RESOLVER_UNREF (destroy_resolver, "channel", closure_list);
+ }
+
+ if (lb_policy)
+ {
+ grpc_lb_policy_broadcast (lb_policy, op, closure_list);
+ GRPC_LB_POLICY_UNREF (lb_policy, "broadcast", closure_list);
+ }
}
/* Constructor for call_data */
-static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op,
- grpc_closure_list *closure_list) {
+static void
+init_call_elem (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list)
+{
call_data *calld = elem->call_data;
/* TODO(ctiller): is there something useful we can do here? */
- GPR_ASSERT(initial_op == NULL);
+ GPR_ASSERT (initial_op == NULL);
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
- GPR_ASSERT(server_transport_data == NULL);
- gpr_mu_init(&calld->mu_state);
+ GPR_ASSERT (elem->filter == &grpc_client_channel_filter);
+ GPR_ASSERT (server_transport_data == NULL);
+ gpr_mu_init (&calld->mu_state);
calld->elem = elem;
calld->state = CALL_CREATED;
- calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ calld->deadline = gpr_inf_future (GPR_CLOCK_REALTIME);
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem,
- grpc_closure_list *closure_list) {
+static void
+destroy_call_elem (grpc_call_element * elem, grpc_closure_list * closure_list)
+{
call_data *calld = elem->call_data;
grpc_subchannel_call *subchannel_call;
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
- gpr_mu_lock(&calld->mu_state);
- switch (calld->state) {
+ gpr_mu_lock (&calld->mu_state);
+ switch (calld->state)
+ {
case CALL_ACTIVE:
subchannel_call = calld->subchannel_call;
- gpr_mu_unlock(&calld->mu_state);
- GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel",
- closure_list);
+ gpr_mu_unlock (&calld->mu_state);
+ GRPC_SUBCHANNEL_CALL_UNREF (subchannel_call, "client_channel", closure_list);
break;
case CALL_CREATED:
case CALL_CANCELLED:
- gpr_mu_unlock(&calld->mu_state);
+ gpr_mu_unlock (&calld->mu_state);
break;
case CALL_WAITING_FOR_PICK:
case CALL_WAITING_FOR_CONFIG:
case CALL_WAITING_FOR_CALL:
case CALL_WAITING_FOR_SEND:
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
+ gpr_log (GPR_ERROR, "should never reach here");
+ abort ();
break;
- }
+ }
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args,
- grpc_mdctx *metadata_context, int is_first,
- int is_last, grpc_closure_list *closure_list) {
+static void
+init_channel_elem (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, int is_first, int is_last, grpc_closure_list * closure_list)
+{
channel_data *chand = elem->channel_data;
- memset(chand, 0, sizeof(*chand));
+ memset (chand, 0, sizeof (*chand));
- GPR_ASSERT(is_last);
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ GPR_ASSERT (is_last);
+ GPR_ASSERT (elem->filter == &grpc_client_channel_filter);
- gpr_mu_init(&chand->mu_config);
+ gpr_mu_init (&chand->mu_config);
chand->mdctx = metadata_context;
chand->master = master;
- grpc_pollset_set_init(&chand->pollset_set);
- grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
+ grpc_pollset_set_init (&chand->pollset_set);
+ grpc_closure_init (&chand->on_config_changed, cc_on_config_changed, chand);
- grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
- "client_channel");
+ grpc_connectivity_state_init (&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel");
}
/* Destructor for channel_data */
-static void destroy_channel_elem(grpc_channel_element *elem,
- grpc_closure_list *closure_list) {
+static void
+destroy_channel_elem (grpc_channel_element * elem, grpc_closure_list * closure_list)
+{
channel_data *chand = elem->channel_data;
- if (chand->resolver != NULL) {
- grpc_resolver_shutdown(chand->resolver, closure_list);
- GRPC_RESOLVER_UNREF(chand->resolver, "channel", closure_list);
- }
- if (chand->lb_policy != NULL) {
- GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel", closure_list);
- }
- grpc_connectivity_state_destroy(&chand->state_tracker, closure_list);
- grpc_pollset_set_destroy(&chand->pollset_set);
- gpr_mu_destroy(&chand->mu_config);
+ if (chand->resolver != NULL)
+ {
+ grpc_resolver_shutdown (chand->resolver, closure_list);
+ GRPC_RESOLVER_UNREF (chand->resolver, "channel", closure_list);
+ }
+ if (chand->lb_policy != NULL)
+ {
+ GRPC_LB_POLICY_UNREF (chand->lb_policy, "channel", closure_list);
+ }
+ grpc_connectivity_state_destroy (&chand->state_tracker, closure_list);
+ grpc_pollset_set_destroy (&chand->pollset_set);
+ gpr_mu_destroy (&chand->mu_config);
}
const grpc_channel_filter grpc_client_channel_filter = {
- cc_start_transport_stream_op,
- cc_start_transport_op,
- sizeof(call_data),
- init_call_elem,
- destroy_call_elem,
- sizeof(channel_data),
- init_channel_elem,
- destroy_channel_elem,
- cc_get_peer,
- "client-channel",
+ cc_start_transport_stream_op,
+ cc_start_transport_op,
+ sizeof (call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof (channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ cc_get_peer,
+ "client-channel",
};
-void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
- grpc_resolver *resolver,
- grpc_closure_list *closure_list) {
+void
+grpc_client_channel_set_resolver (grpc_channel_stack * channel_stack, grpc_resolver * resolver, grpc_closure_list * closure_list)
+{
/* post construction initialization: set the transport setup pointer */
- grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
+ grpc_channel_element *elem = grpc_channel_stack_last_element (channel_stack);
channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->mu_config);
- GPR_ASSERT(!chand->resolver);
+ gpr_mu_lock (&chand->mu_config);
+ GPR_ASSERT (!chand->resolver);
chand->resolver = resolver;
- GRPC_RESOLVER_REF(resolver, "channel");
- if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
- chand->exit_idle_when_lb_policy_arrives) {
- chand->started_resolving = 1;
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
- grpc_resolver_next(resolver, &chand->incoming_configuration,
- &chand->on_config_changed, closure_list);
- }
- gpr_mu_unlock(&chand->mu_config);
+ GRPC_RESOLVER_REF (resolver, "channel");
+ if (!grpc_closure_list_empty (chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives)
+ {
+ chand->started_resolving = 1;
+ GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver");
+ grpc_resolver_next (resolver, &chand->incoming_configuration, &chand->on_config_changed, closure_list);
+ }
+ gpr_mu_unlock (&chand->mu_config);
}
-grpc_connectivity_state grpc_client_channel_check_connectivity_state(
- grpc_channel_element *elem, int try_to_connect,
- grpc_closure_list *closure_list) {
+grpc_connectivity_state
+grpc_client_channel_check_connectivity_state (grpc_channel_element * elem, int try_to_connect, grpc_closure_list * closure_list)
+{
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
- gpr_mu_lock(&chand->mu_config);
- out = grpc_connectivity_state_check(&chand->state_tracker);
- if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
- if (chand->lb_policy != NULL) {
- grpc_lb_policy_exit_idle(chand->lb_policy, closure_list);
- } else {
- chand->exit_idle_when_lb_policy_arrives = 1;
- if (!chand->started_resolving && chand->resolver != NULL) {
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
- chand->started_resolving = 1;
- grpc_resolver_next(chand->resolver, &chand->incoming_configuration,
- &chand->on_config_changed, closure_list);
- }
- }
- }
- gpr_mu_unlock(&chand->mu_config);
+ gpr_mu_lock (&chand->mu_config);
+ out = grpc_connectivity_state_check (&chand->state_tracker);
+ if (out == GRPC_CHANNEL_IDLE && try_to_connect)
+ {
+ if (chand->lb_policy != NULL)
+ {
+ grpc_lb_policy_exit_idle (chand->lb_policy, closure_list);
+ }
+ else
+ {
+ chand->exit_idle_when_lb_policy_arrives = 1;
+ if (!chand->started_resolving && chand->resolver != NULL)
+ {
+ GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver");
+ chand->started_resolving = 1;
+ grpc_resolver_next (chand->resolver, &chand->incoming_configuration, &chand->on_config_changed, closure_list);
+ }
+ }
+ }
+ gpr_mu_unlock (&chand->mu_config);
return out;
}
-void grpc_client_channel_watch_connectivity_state(
- grpc_channel_element *elem, grpc_connectivity_state *state,
- grpc_closure *on_complete, grpc_closure_list *closure_list) {
+void
+grpc_client_channel_watch_connectivity_state (grpc_channel_element * elem, grpc_connectivity_state * state, grpc_closure * on_complete, grpc_closure_list * closure_list)
+{
channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->mu_config);
- grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
- on_complete, closure_list);
- gpr_mu_unlock(&chand->mu_config);
+ gpr_mu_lock (&chand->mu_config);
+ grpc_connectivity_state_notify_on_state_change (&chand->state_tracker, state, on_complete, closure_list);
+ gpr_mu_unlock (&chand->mu_config);
}
-grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
- grpc_channel_element *elem) {
+grpc_pollset_set *
+grpc_client_channel_get_connecting_pollset_set (grpc_channel_element * elem)
+{
channel_data *chand = elem->channel_data;
return &chand->pollset_set;
}
-void grpc_client_channel_add_interested_party(grpc_channel_element *elem,
- grpc_pollset *pollset,
- grpc_closure_list *closure_list) {
+void
+grpc_client_channel_add_interested_party (grpc_channel_element * elem, grpc_pollset * pollset, grpc_closure_list * closure_list)
+{
channel_data *chand = elem->channel_data;
- grpc_pollset_set_add_pollset(&chand->pollset_set, pollset, closure_list);
+ grpc_pollset_set_add_pollset (&chand->pollset_set, pollset, closure_list);
}
-void grpc_client_channel_del_interested_party(grpc_channel_element *elem,
- grpc_pollset *pollset,
- grpc_closure_list *closure_list) {
+void
+grpc_client_channel_del_interested_party (grpc_channel_element * elem, grpc_pollset * pollset, grpc_closure_list * closure_list)
+{
channel_data *chand = elem->channel_data;
- grpc_pollset_set_del_pollset(&chand->pollset_set, pollset, closure_list);
+ grpc_pollset_set_del_pollset (&chand->pollset_set, pollset, closure_list);
}
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index d200f6e4b0..468cf5ef3e 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -49,26 +49,15 @@ extern const grpc_channel_filter grpc_client_channel_filter;
/* post-construction initializer to let the client channel know which
transport setup it should cancel upon destruction, or initiate when it needs
a connection */
-void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
- grpc_resolver *resolver,
- grpc_closure_list *closure_list);
+void grpc_client_channel_set_resolver (grpc_channel_stack * channel_stack, grpc_resolver * resolver, grpc_closure_list * closure_list);
-grpc_connectivity_state grpc_client_channel_check_connectivity_state(
- grpc_channel_element *elem, int try_to_connect,
- grpc_closure_list *closure_list);
+grpc_connectivity_state grpc_client_channel_check_connectivity_state (grpc_channel_element * elem, int try_to_connect, grpc_closure_list * closure_list);
-void grpc_client_channel_watch_connectivity_state(
- grpc_channel_element *elem, grpc_connectivity_state *state,
- grpc_closure *on_complete, grpc_closure_list *closure_list);
+void grpc_client_channel_watch_connectivity_state (grpc_channel_element * elem, grpc_connectivity_state * state, grpc_closure * on_complete, grpc_closure_list * closure_list);
-grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
- grpc_channel_element *elem);
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set (grpc_channel_element * elem);
-void grpc_client_channel_add_interested_party(grpc_channel_element *channel,
- grpc_pollset *pollset,
- grpc_closure_list *closure_list);
-void grpc_client_channel_del_interested_party(grpc_channel_element *channel,
- grpc_pollset *pollset,
- grpc_closure_list *closure_list);
+void grpc_client_channel_add_interested_party (grpc_channel_element * channel, grpc_pollset * pollset, grpc_closure_list * closure_list);
+void grpc_client_channel_del_interested_party (grpc_channel_element * channel, grpc_pollset * pollset, grpc_closure_list * closure_list);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 911c689d54..a1c03dc9d9 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -44,13 +44,14 @@
#include "src/core/compression/message_compress.h"
#include "src/core/support/string.h"
-typedef struct call_data {
+typedef struct call_data
+{
gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage;
- gpr_uint32
- remaining_slice_bytes; /**< Input data to be read, as per BEGIN_MESSAGE */
- int written_initial_metadata; /**< Already processed initial md? */
+ gpr_uint32 remaining_slice_bytes;
+ /**< Input data to be read, as per BEGIN_MESSAGE */
+ int written_initial_metadata; /**< Already processed initial md? */
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
grpc_compression_algorithm compression_algorithm;
@@ -58,7 +59,8 @@ typedef struct call_data {
int has_compression_algorithm;
} call_data;
-typedef struct channel_data {
+typedef struct channel_data
+{
/** Metadata key for the incoming (requested) compression algorithm */
grpc_mdstr *mdstr_request_compression_algorithm_key;
/** Metadata key for the outgoing (used) compression algorithm */
@@ -80,59 +82,62 @@ typedef struct channel_data {
* larger than the raw input).
*
* Returns 1 if the data was actually compress and 0 otherwise. */
-static int compress_send_sb(grpc_compression_algorithm algorithm,
- gpr_slice_buffer *slices) {
+static int
+compress_send_sb (grpc_compression_algorithm algorithm, gpr_slice_buffer * slices)
+{
int did_compress;
gpr_slice_buffer tmp;
- gpr_slice_buffer_init(&tmp);
- did_compress = grpc_msg_compress(algorithm, slices, &tmp);
- if (did_compress) {
- gpr_slice_buffer_swap(slices, &tmp);
- }
- gpr_slice_buffer_destroy(&tmp);
+ gpr_slice_buffer_init (&tmp);
+ did_compress = grpc_msg_compress (algorithm, slices, &tmp);
+ if (did_compress)
+ {
+ gpr_slice_buffer_swap (slices, &tmp);
+ }
+ gpr_slice_buffer_destroy (&tmp);
return did_compress;
}
/** For each \a md element from the incoming metadata, filter out the entry for
* "grpc-encoding", using its value to populate the call data's
* compression_algorithm field. */
-static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *
+compression_md_filter (void *user_data, grpc_mdelem * md)
+{
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
- if (md->key == channeld->mdstr_request_compression_algorithm_key) {
- const char *md_c_str = grpc_mdstr_as_c_string(md->value);
- if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
- &calld->compression_algorithm)) {
- gpr_log(GPR_ERROR,
- "Invalid compression algorithm: '%s' (unknown). Ignoring.",
- md_c_str);
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- }
- if (grpc_compression_options_is_algorithm_enabled(
- &channeld->compression_options, calld->compression_algorithm) ==
- 0) {
- gpr_log(GPR_ERROR,
- "Invalid compression algorithm: '%s' (previously disabled). "
- "Ignoring.",
- md_c_str);
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ if (md->key == channeld->mdstr_request_compression_algorithm_key)
+ {
+ const char *md_c_str = grpc_mdstr_as_c_string (md->value);
+ if (!grpc_compression_algorithm_parse (md_c_str, strlen (md_c_str), &calld->compression_algorithm))
+ {
+ gpr_log (GPR_ERROR, "Invalid compression algorithm: '%s' (unknown). Ignoring.", md_c_str);
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ if (grpc_compression_options_is_algorithm_enabled (&channeld->compression_options, calld->compression_algorithm) == 0)
+ {
+ gpr_log (GPR_ERROR, "Invalid compression algorithm: '%s' (previously disabled). " "Ignoring.", md_c_str);
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ calld->has_compression_algorithm = 1;
+ return NULL;
}
- calld->has_compression_algorithm = 1;
- return NULL;
- }
return md;
}
-static int skip_compression(channel_data *channeld, call_data *calld) {
- if (calld->has_compression_algorithm) {
- if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
- return 1;
+static int
+skip_compression (channel_data * channeld, call_data * calld)
+{
+ if (calld->has_compression_algorithm)
+ {
+ if (calld->compression_algorithm == GRPC_COMPRESS_NONE)
+ {
+ return 1;
+ }
+ return 0; /* we have an actual call-specific algorithm */
}
- return 0; /* we have an actual call-specific algorithm */
- }
/* no per-call compression override */
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
}
@@ -141,126 +146,127 @@ static int skip_compression(channel_data *channeld, call_data *calld) {
* the associated GRPC_OP_BEGIN_MESSAGE accordingly (new compressed length,
* flags indicating compression is in effect) and replaces \a send_ops with it.
* */
-static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
- grpc_call_element *elem) {
+static void
+finish_compressed_sopb (grpc_stream_op_buffer * send_ops, grpc_call_element * elem)
+{
size_t i;
call_data *calld = elem->call_data;
- int new_slices_added = 0; /* GPR_FALSE */
+ int new_slices_added = 0; /* GPR_FALSE */
grpc_metadata_batch metadata;
grpc_stream_op_buffer new_send_ops;
- grpc_sopb_init(&new_send_ops);
-
- for (i = 0; i < send_ops->nops; i++) {
- grpc_stream_op *sop = &send_ops->ops[i];
- switch (sop->type) {
- case GRPC_OP_BEGIN_MESSAGE:
- GPR_ASSERT(calld->slices.length <= GPR_UINT32_MAX);
- grpc_sopb_add_begin_message(
- &new_send_ops, (gpr_uint32)calld->slices.length,
- sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS);
- break;
- case GRPC_OP_SLICE:
- /* Once we reach the slices section of the original buffer, simply add
- * all the new (compressed) slices. We obviously want to do this only
- * once, hence the "new_slices_added" guard. */
- if (!new_slices_added) {
- size_t j;
- for (j = 0; j < calld->slices.count; ++j) {
- grpc_sopb_add_slice(&new_send_ops,
- gpr_slice_ref(calld->slices.slices[j]));
- }
- new_slices_added = 1; /* GPR_TRUE */
- }
- break;
- case GRPC_OP_METADATA:
- /* move the metadata to the new buffer. */
- grpc_metadata_batch_move(&metadata, &sop->data.metadata);
- grpc_sopb_add_metadata(&new_send_ops, metadata);
- break;
- case GRPC_NO_OP:
- break;
+ grpc_sopb_init (&new_send_ops);
+
+ for (i = 0; i < send_ops->nops; i++)
+ {
+ grpc_stream_op *sop = &send_ops->ops[i];
+ switch (sop->type)
+ {
+ case GRPC_OP_BEGIN_MESSAGE:
+ GPR_ASSERT (calld->slices.length <= GPR_UINT32_MAX);
+ grpc_sopb_add_begin_message (&new_send_ops, (gpr_uint32) calld->slices.length, sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS);
+ break;
+ case GRPC_OP_SLICE:
+ /* Once we reach the slices section of the original buffer, simply add
+ * all the new (compressed) slices. We obviously want to do this only
+ * once, hence the "new_slices_added" guard. */
+ if (!new_slices_added)
+ {
+ size_t j;
+ for (j = 0; j < calld->slices.count; ++j)
+ {
+ grpc_sopb_add_slice (&new_send_ops, gpr_slice_ref (calld->slices.slices[j]));
+ }
+ new_slices_added = 1; /* GPR_TRUE */
+ }
+ break;
+ case GRPC_OP_METADATA:
+ /* move the metadata to the new buffer. */
+ grpc_metadata_batch_move (&metadata, &sop->data.metadata);
+ grpc_sopb_add_metadata (&new_send_ops, metadata);
+ break;
+ case GRPC_NO_OP:
+ break;
+ }
}
- }
- grpc_sopb_swap(send_ops, &new_send_ops);
- grpc_sopb_destroy(&new_send_ops);
+ grpc_sopb_swap (send_ops, &new_send_ops);
+ grpc_sopb_destroy (&new_send_ops);
}
/** Filter's "main" function, called for any incoming grpc_transport_stream_op
* instance that holds a non-zero number of send operations, accesible to this
* function in \a send_ops. */
-static void process_send_ops(grpc_call_element *elem,
- grpc_stream_op_buffer *send_ops) {
+static void
+process_send_ops (grpc_call_element * elem, grpc_stream_op_buffer * send_ops)
+{
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
size_t i;
int did_compress = 0;
/* In streaming calls, we need to reset the previously accumulated slices */
- gpr_slice_buffer_reset_and_unref(&calld->slices);
- for (i = 0; i < send_ops->nops; ++i) {
- grpc_stream_op *sop = &send_ops->ops[i];
- switch (sop->type) {
- case GRPC_OP_BEGIN_MESSAGE:
- /* buffer up slices until we've processed all the expected ones (as
- * given by GRPC_OP_BEGIN_MESSAGE) */
- calld->remaining_slice_bytes = sop->data.begin_message.length;
- if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
- calld->has_compression_algorithm = 1; /* GPR_TRUE */
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- }
- break;
- case GRPC_OP_METADATA:
- if (!calld->written_initial_metadata) {
- /* Parse incoming request for compression. If any, it'll be available
- * at calld->compression_algorithm */
- grpc_metadata_batch_filter(&(sop->data.metadata),
- compression_md_filter, elem);
- if (!calld->has_compression_algorithm) {
- /* If no algorithm was found in the metadata and we aren't
- * exceptionally skipping compression, fall back to the channel
- * default */
- calld->compression_algorithm =
- channeld->default_compression_algorithm;
- calld->has_compression_algorithm = 1; /* GPR_TRUE */
- }
- /* hint compression algorithm */
- grpc_metadata_batch_add_tail(
- &(sop->data.metadata), &calld->compression_algorithm_storage,
- GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms
- [calld->compression_algorithm]));
-
- /* convey supported compression algorithms */
- grpc_metadata_batch_add_tail(
- &(sop->data.metadata), &calld->accept_encoding_storage,
- GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
-
- calld->written_initial_metadata = 1; /* GPR_TRUE */
- }
- break;
- case GRPC_OP_SLICE:
- if (skip_compression(channeld, calld)) continue;
- GPR_ASSERT(calld->remaining_slice_bytes > 0);
- /* Increase input ref count, gpr_slice_buffer_add takes ownership. */
- gpr_slice_buffer_add(&calld->slices, gpr_slice_ref(sop->data.slice));
- GPR_ASSERT(GPR_SLICE_LENGTH(sop->data.slice) >=
- calld->remaining_slice_bytes);
- calld->remaining_slice_bytes -=
- (gpr_uint32)GPR_SLICE_LENGTH(sop->data.slice);
- if (calld->remaining_slice_bytes == 0) {
- did_compress =
- compress_send_sb(calld->compression_algorithm, &calld->slices);
- }
- break;
- case GRPC_NO_OP:
- break;
+ gpr_slice_buffer_reset_and_unref (&calld->slices);
+ for (i = 0; i < send_ops->nops; ++i)
+ {
+ grpc_stream_op *sop = &send_ops->ops[i];
+ switch (sop->type)
+ {
+ case GRPC_OP_BEGIN_MESSAGE:
+ /* buffer up slices until we've processed all the expected ones (as
+ * given by GRPC_OP_BEGIN_MESSAGE) */
+ calld->remaining_slice_bytes = sop->data.begin_message.length;
+ if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS)
+ {
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ break;
+ case GRPC_OP_METADATA:
+ if (!calld->written_initial_metadata)
+ {
+ /* Parse incoming request for compression. If any, it'll be available
+ * at calld->compression_algorithm */
+ grpc_metadata_batch_filter (&(sop->data.metadata), compression_md_filter, elem);
+ if (!calld->has_compression_algorithm)
+ {
+ /* If no algorithm was found in the metadata and we aren't
+ * exceptionally skipping compression, fall back to the channel
+ * default */
+ calld->compression_algorithm = channeld->default_compression_algorithm;
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ }
+ /* hint compression algorithm */
+ grpc_metadata_batch_add_tail (&(sop->data.metadata), &calld->compression_algorithm_storage, GRPC_MDELEM_REF (channeld->mdelem_compression_algorithms[calld->compression_algorithm]));
+
+ /* convey supported compression algorithms */
+ grpc_metadata_batch_add_tail (&(sop->data.metadata), &calld->accept_encoding_storage, GRPC_MDELEM_REF (channeld->mdelem_accept_encoding));
+
+ calld->written_initial_metadata = 1; /* GPR_TRUE */
+ }
+ break;
+ case GRPC_OP_SLICE:
+ if (skip_compression (channeld, calld))
+ continue;
+ GPR_ASSERT (calld->remaining_slice_bytes > 0);
+ /* Increase input ref count, gpr_slice_buffer_add takes ownership. */
+ gpr_slice_buffer_add (&calld->slices, gpr_slice_ref (sop->data.slice));
+ GPR_ASSERT (GPR_SLICE_LENGTH (sop->data.slice) >= calld->remaining_slice_bytes);
+ calld->remaining_slice_bytes -= (gpr_uint32) GPR_SLICE_LENGTH (sop->data.slice);
+ if (calld->remaining_slice_bytes == 0)
+ {
+ did_compress = compress_send_sb (calld->compression_algorithm, &calld->slices);
+ }
+ break;
+ case GRPC_NO_OP:
+ break;
+ }
}
- }
/* Modify the send_ops stream_op_buffer depending on whether compression was
* carried out */
- if (did_compress) {
- finish_compressed_sopb(send_ops, elem);
- }
+ if (did_compress)
+ {
+ finish_compressed_sopb (send_ops, elem);
+ }
}
/* Called either:
@@ -268,50 +274,52 @@ static void process_send_ops(grpc_call_element *elem,
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
-static void compress_start_transport_stream_op(
- grpc_call_element *elem, grpc_transport_stream_op *op,
- grpc_closure_list *closure_list) {
- if (op->send_ops && op->send_ops->nops > 0) {
- process_send_ops(elem, op->send_ops);
- }
+static void
+compress_start_transport_stream_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list)
+{
+ if (op->send_ops && op->send_ops->nops > 0)
+ {
+ process_send_ops (elem, op->send_ops);
+ }
/* pass control down the stack */
- grpc_call_next_op(elem, op, closure_list);
+ grpc_call_next_op (elem, op, closure_list);
}
/* Constructor for call_data */
-static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op,
- grpc_closure_list *closure_list) {
+static void
+init_call_elem (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list)
+{
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
- gpr_slice_buffer_init(&calld->slices);
+ gpr_slice_buffer_init (&calld->slices);
calld->has_compression_algorithm = 0;
- calld->written_initial_metadata = 0; /* GPR_FALSE */
-
- if (initial_op) {
- if (initial_op->send_ops && initial_op->send_ops->nops > 0) {
- process_send_ops(elem, initial_op->send_ops);
+ calld->written_initial_metadata = 0; /* GPR_FALSE */
+
+ if (initial_op)
+ {
+ if (initial_op->send_ops && initial_op->send_ops->nops > 0)
+ {
+ process_send_ops (elem, initial_op->send_ops);
+ }
}
- }
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem,
- grpc_closure_list *closure_list) {
+static void
+destroy_call_elem (grpc_call_element * elem, grpc_closure_list * closure_list)
+{
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
- gpr_slice_buffer_destroy(&calld->slices);
+ gpr_slice_buffer_destroy (&calld->slices);
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last,
- grpc_closure_list *closure_list) {
+static void
+init_channel_elem (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last, grpc_closure_list * closure_list)
+{
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1];
@@ -319,82 +327,72 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
char *accept_encoding_str;
size_t accept_encoding_str_len;
- grpc_compression_options_init(&channeld->compression_options);
- channeld->compression_options.enabled_algorithms_bitset =
- (gpr_uint32)grpc_channel_args_compression_algorithm_get_states(args);
+ grpc_compression_options_init (&channeld->compression_options);
+ channeld->compression_options.enabled_algorithms_bitset = (gpr_uint32) grpc_channel_args_compression_algorithm_get_states (args);
- channeld->default_compression_algorithm =
- grpc_channel_args_get_compression_algorithm(args);
+ channeld->default_compression_algorithm = grpc_channel_args_get_compression_algorithm (args);
/* Make sure the default isn't disabled. */
- GPR_ASSERT(grpc_compression_options_is_algorithm_enabled(
- &channeld->compression_options, channeld->default_compression_algorithm));
- channeld->compression_options.default_compression_algorithm =
- channeld->default_compression_algorithm;
-
- channeld->mdstr_request_compression_algorithm_key =
- grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0);
-
- channeld->mdstr_outgoing_compression_algorithm_key =
- grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
-
- channeld->mdstr_compression_capabilities_key =
- grpc_mdstr_from_string(mdctx, "grpc-accept-encoding", 0);
-
- for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
- char *algorithm_name;
- /* skip disabled algorithms */
- if (grpc_compression_options_is_algorithm_enabled(
- &channeld->compression_options, algo_idx) == 0) {
- continue;
- }
- GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0);
- channeld->mdelem_compression_algorithms[algo_idx] =
- grpc_mdelem_from_metadata_strings(
- mdctx,
- GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
- grpc_mdstr_from_string(mdctx, algorithm_name, 0));
- if (algo_idx > 0) {
- supported_algorithms_names[supported_algorithms_idx++] = algorithm_name;
+ GPR_ASSERT (grpc_compression_options_is_algorithm_enabled (&channeld->compression_options, channeld->default_compression_algorithm));
+ channeld->compression_options.default_compression_algorithm = channeld->default_compression_algorithm;
+
+ channeld->mdstr_request_compression_algorithm_key = grpc_mdstr_from_string (mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0);
+
+ channeld->mdstr_outgoing_compression_algorithm_key = grpc_mdstr_from_string (mdctx, "grpc-encoding", 0);
+
+ channeld->mdstr_compression_capabilities_key = grpc_mdstr_from_string (mdctx, "grpc-accept-encoding", 0);
+
+ for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx)
+ {
+ char *algorithm_name;
+ /* skip disabled algorithms */
+ if (grpc_compression_options_is_algorithm_enabled (&channeld->compression_options, algo_idx) == 0)
+ {
+ continue;
+ }
+ GPR_ASSERT (grpc_compression_algorithm_name (algo_idx, &algorithm_name) != 0);
+ channeld->mdelem_compression_algorithms[algo_idx] = grpc_mdelem_from_metadata_strings (mdctx, GRPC_MDSTR_REF (channeld->mdstr_outgoing_compression_algorithm_key), grpc_mdstr_from_string (mdctx, algorithm_name, 0));
+ if (algo_idx > 0)
+ {
+ supported_algorithms_names[supported_algorithms_idx++] = algorithm_name;
+ }
}
- }
/* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated
* arrays, as to avoid the heap allocs */
- accept_encoding_str =
- gpr_strjoin_sep(supported_algorithms_names, supported_algorithms_idx, ",",
- &accept_encoding_str_len);
+ accept_encoding_str = gpr_strjoin_sep (supported_algorithms_names, supported_algorithms_idx, ",", &accept_encoding_str_len);
- channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings(
- mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
- grpc_mdstr_from_string(mdctx, accept_encoding_str, 0));
- gpr_free(accept_encoding_str);
+ channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings (mdctx, GRPC_MDSTR_REF (channeld->mdstr_compression_capabilities_key), grpc_mdstr_from_string (mdctx, accept_encoding_str, 0));
+ gpr_free (accept_encoding_str);
- GPR_ASSERT(!is_last);
+ GPR_ASSERT (!is_last);
}
/* Destructor for channel data */
-static void destroy_channel_elem(grpc_channel_element *elem,
- grpc_closure_list *closure_list) {
+static void
+destroy_channel_elem (grpc_channel_element * elem, grpc_closure_list * closure_list)
+{
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
- GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key);
- GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key);
- GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key);
- for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
- GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]);
- }
- GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding);
+ GRPC_MDSTR_UNREF (channeld->mdstr_request_compression_algorithm_key);
+ GRPC_MDSTR_UNREF (channeld->mdstr_outgoing_compression_algorithm_key);
+ GRPC_MDSTR_UNREF (channeld->mdstr_compression_capabilities_key);
+ for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx)
+ {
+ GRPC_MDELEM_UNREF (channeld->mdelem_compression_algorithms[algo_idx]);
+ }
+ GRPC_MDELEM_UNREF (channeld->mdelem_accept_encoding);
}
const grpc_channel_filter grpc_compress_filter = {
- compress_start_transport_stream_op,
- grpc_channel_next_op,
- sizeof(call_data),
- init_call_elem,
- destroy_call_elem,
- sizeof(channel_data),
- init_channel_elem,
- destroy_channel_elem,
- grpc_call_next_get_peer,
- "compress"};
+ compress_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof (call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof (channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "compress"
+};
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index b58e180a43..7ba412fe5c 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -46,11 +46,15 @@
#define MAX_BUFFER_LENGTH 8192
-typedef struct connected_channel_channel_data {
+typedef struct connected_channel_channel_data
+{
grpc_transport *transport;
} channel_data;
-typedef struct connected_channel_call_data { void *unused; } call_data;
+typedef struct connected_channel_call_data
+{
+ void *unused;
+} call_data;
/* We perform a small hack to locate transport data alongside the connected
channel data in call allocations, to allow everything to be pulled in minimal
@@ -61,98 +65,95 @@ typedef struct connected_channel_call_data { void *unused; } call_data;
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
-static void con_start_transport_stream_op(grpc_call_element *elem,
- grpc_transport_stream_op *op,
- grpc_closure_list *closure_list) {
+static void
+con_start_transport_stream_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list)
+{
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ GPR_ASSERT (elem->filter == &grpc_connected_channel_filter);
+ GRPC_CALL_LOG_OP (GPR_INFO, elem, op);
- grpc_transport_perform_stream_op(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld), op,
- closure_list);
+ grpc_transport_perform_stream_op (chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld), op, closure_list);
}
-static void con_start_transport_op(grpc_channel_element *elem,
- grpc_transport_op *op,
- grpc_closure_list *closure_list) {
+static void
+con_start_transport_op (grpc_channel_element * elem, grpc_transport_op * op, grpc_closure_list * closure_list)
+{
channel_data *chand = elem->channel_data;
- grpc_transport_perform_op(chand->transport, op, closure_list);
+ grpc_transport_perform_op (chand->transport, op, closure_list);
}
/* Constructor for call_data */
-static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op,
- grpc_closure_list *closure_list) {
+static void
+init_call_elem (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list)
+{
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r;
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- r = grpc_transport_init_stream(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- server_transport_data, initial_op, closure_list);
- GPR_ASSERT(r == 0);
+ GPR_ASSERT (elem->filter == &grpc_connected_channel_filter);
+ r = grpc_transport_init_stream (chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld), server_transport_data, initial_op, closure_list);
+ GPR_ASSERT (r == 0);
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem,
- grpc_closure_list *closure_list) {
+static void
+destroy_call_elem (grpc_call_element * elem, grpc_closure_list * closure_list)
+{
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- grpc_transport_destroy_stream(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), closure_list);
+ GPR_ASSERT (elem->filter == &grpc_connected_channel_filter);
+ grpc_transport_destroy_stream (chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld), closure_list);
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last,
- grpc_closure_list *closure_list) {
- channel_data *cd = (channel_data *)elem->channel_data;
- GPR_ASSERT(is_last);
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+static void
+init_channel_elem (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last, grpc_closure_list * closure_list)
+{
+ channel_data *cd = (channel_data *) elem->channel_data;
+ GPR_ASSERT (is_last);
+ GPR_ASSERT (elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
}
/* Destructor for channel_data */
-static void destroy_channel_elem(grpc_channel_element *elem,
- grpc_closure_list *closure_list) {
- channel_data *cd = (channel_data *)elem->channel_data;
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- grpc_transport_destroy(cd->transport, closure_list);
+static void
+destroy_channel_elem (grpc_channel_element * elem, grpc_closure_list * closure_list)
+{
+ channel_data *cd = (channel_data *) elem->channel_data;
+ GPR_ASSERT (elem->filter == &grpc_connected_channel_filter);
+ grpc_transport_destroy (cd->transport, closure_list);
}
-static char *con_get_peer(grpc_call_element *elem,
- grpc_closure_list *closure_list) {
+static char *
+con_get_peer (grpc_call_element * elem, grpc_closure_list * closure_list)
+{
channel_data *chand = elem->channel_data;
- return grpc_transport_get_peer(chand->transport, closure_list);
+ return grpc_transport_get_peer (chand->transport, closure_list);
}
const grpc_channel_filter grpc_connected_channel_filter = {
- con_start_transport_stream_op,
- con_start_transport_op,
- sizeof(call_data),
- init_call_elem,
- destroy_call_elem,
- sizeof(channel_data),
- init_channel_elem,
- destroy_channel_elem,
- con_get_peer,
- "connected",
+ con_start_transport_stream_op,
+ con_start_transport_op,
+ sizeof (call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof (channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ con_get_peer,
+ "connected",
};
-void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
- grpc_transport *transport) {
+void
+grpc_connected_channel_bind_transport (grpc_channel_stack * channel_stack, grpc_transport * transport)
+{
/* Assumes that the connected channel filter is always the last filter
in a channel stack */
- grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
- channel_data *cd = (channel_data *)elem->channel_data;
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- GPR_ASSERT(cd->transport == NULL);
+ grpc_channel_element *elem = grpc_channel_stack_last_element (channel_stack);
+ channel_data *cd = (channel_data *) elem->channel_data;
+ GPR_ASSERT (elem->filter == &grpc_connected_channel_filter);
+ GPR_ASSERT (cd->transport == NULL);
cd->transport = transport;
/* HACK(ctiller): increase call stack size for the channel to make space
@@ -161,5 +162,5 @@ void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
This is only "safe" because call stacks place no additional data after
the last call element, and the last call element MUST be the connected
channel. */
- channel_stack->call_stack_size += grpc_transport_stream_size(transport);
+ channel_stack->call_stack_size += grpc_transport_stream_size (transport);
}
diff --git a/src/core/channel/connected_channel.h b/src/core/channel/connected_channel.h
index b615b0d350..67bcd6c911 100644
--- a/src/core/channel/connected_channel.h
+++ b/src/core/channel/connected_channel.h
@@ -43,7 +43,6 @@ extern const grpc_channel_filter grpc_connected_channel_filter;
/* Post construction fixup: set the transport in the connected channel.
Must be called before any call stack using this filter is used. */
-void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
- grpc_transport *transport);
+void grpc_connected_channel_bind_transport (grpc_channel_stack * channel_stack, grpc_transport * transport);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */
diff --git a/src/core/channel/context.h b/src/core/channel/context.h
index ac5796b9ef..076dc7189b 100644
--- a/src/core/channel/context.h
+++ b/src/core/channel/context.h
@@ -35,15 +35,17 @@
#define GRPC_INTERNAL_CORE_CHANNEL_CONTEXT_H
/* Call object context pointers */
-typedef enum {
+typedef enum
+{
GRPC_CONTEXT_SECURITY = 0,
GRPC_CONTEXT_TRACING,
GRPC_CONTEXT_COUNT
} grpc_context_index;
-typedef struct {
+typedef struct
+{
void *value;
- void (*destroy)(void *);
+ void (*destroy) (void *);
} grpc_call_context_element;
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONTEXT_H */
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 6fbe4738e7..1a166833e7 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -37,7 +37,8 @@
#include <grpc/support/string_util.h>
#include "src/core/support/string.h"
-typedef struct call_data {
+typedef struct call_data
+{
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;
grpc_linked_mdelem authority;
@@ -57,7 +58,8 @@ typedef struct call_data {
grpc_closure hc_on_recv;
} call_data;
-typedef struct channel_data {
+typedef struct channel_data
+{
grpc_mdelem *te_trailers;
grpc_mdelem *method;
grpc_mdelem *scheme;
@@ -67,224 +69,255 @@ typedef struct channel_data {
grpc_mdelem *user_agent;
} channel_data;
-typedef struct {
+typedef struct
+{
grpc_call_element *elem;
grpc_closure_list *closure_list;
} client_recv_filter_args;
-static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *
+client_recv_filter (void *user_data, grpc_mdelem * md)
+{
client_recv_filter_args *a = user_data;
grpc_call_element *elem = a->elem;
channel_data *channeld = elem->channel_data;
- if (md == channeld->status) {
- return NULL;
- } else if (md->key == channeld->status->key) {
- grpc_call_element_send_cancel(elem, a->closure_list);
- return NULL;
- } else if (md->key == channeld->content_type->key) {
- return NULL;
- }
+ if (md == channeld->status)
+ {
+ return NULL;
+ }
+ else if (md->key == channeld->status->key)
+ {
+ grpc_call_element_send_cancel (elem, a->closure_list);
+ return NULL;
+ }
+ else if (md->key == channeld->content_type->key)
+ {
+ return NULL;
+ }
return md;
}
-static void hc_on_recv(void *user_data, int success,
- grpc_closure_list *closure_list) {
+static void
+hc_on_recv (void *user_data, int success, grpc_closure_list * closure_list)
+{
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
size_t i;
size_t nops = calld->recv_ops->nops;
grpc_stream_op *ops = calld->recv_ops->ops;
- for (i = 0; i < nops; i++) {
- grpc_stream_op *op = &ops[i];
- client_recv_filter_args a;
- if (op->type != GRPC_OP_METADATA) continue;
- calld->got_initial_metadata = 1;
- a.elem = elem;
- a.closure_list = closure_list;
- grpc_metadata_batch_filter(&op->data.metadata, client_recv_filter, &a);
- }
- calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success, closure_list);
+ for (i = 0; i < nops; i++)
+ {
+ grpc_stream_op *op = &ops[i];
+ client_recv_filter_args a;
+ if (op->type != GRPC_OP_METADATA)
+ continue;
+ calld->got_initial_metadata = 1;
+ a.elem = elem;
+ a.closure_list = closure_list;
+ grpc_metadata_batch_filter (&op->data.metadata, client_recv_filter, &a);
+ }
+ calld->on_done_recv->cb (calld->on_done_recv->cb_arg, success, closure_list);
}
-static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *
+client_strip_filter (void *user_data, grpc_mdelem * md)
+{
grpc_call_element *elem = user_data;
channel_data *channeld = elem->channel_data;
/* eat the things we'd like to set ourselves */
- if (md->key == channeld->method->key) return NULL;
- if (md->key == channeld->scheme->key) return NULL;
- if (md->key == channeld->te_trailers->key) return NULL;
- if (md->key == channeld->content_type->key) return NULL;
- if (md->key == channeld->user_agent->key) return NULL;
+ if (md->key == channeld->method->key)
+ return NULL;
+ if (md->key == channeld->scheme->key)
+ return NULL;
+ if (md->key == channeld->te_trailers->key)
+ return NULL;
+ if (md->key == channeld->content_type->key)
+ return NULL;
+ if (md->key == channeld->user_agent->key)
+ return NULL;
return md;
}
-static void hc_mutate_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void
+hc_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op)
+{
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
size_t i;
- if (op->send_ops && !calld->sent_initial_metadata) {
- size_t nops = op->send_ops->nops;
- grpc_stream_op *ops = op->send_ops->ops;
- for (i = 0; i < nops; i++) {
- grpc_stream_op *op = &ops[i];
- if (op->type != GRPC_OP_METADATA) continue;
- calld->sent_initial_metadata = 1;
- grpc_metadata_batch_filter(&op->data.metadata, client_strip_filter, elem);
- /* Send : prefixed headers, which have to be before any application
- layer headers. */
- grpc_metadata_batch_add_head(&op->data.metadata, &calld->method,
- GRPC_MDELEM_REF(channeld->method));
- grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme,
- GRPC_MDELEM_REF(channeld->scheme));
- grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers,
- GRPC_MDELEM_REF(channeld->te_trailers));
- grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
- GRPC_MDELEM_REF(channeld->content_type));
- grpc_metadata_batch_add_tail(&op->data.metadata, &calld->user_agent,
- GRPC_MDELEM_REF(channeld->user_agent));
- break;
+ if (op->send_ops && !calld->sent_initial_metadata)
+ {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++)
+ {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA)
+ continue;
+ calld->sent_initial_metadata = 1;
+ grpc_metadata_batch_filter (&op->data.metadata, client_strip_filter, elem);
+ /* Send : prefixed headers, which have to be before any application
+ layer headers. */
+ grpc_metadata_batch_add_head (&op->data.metadata, &calld->method, GRPC_MDELEM_REF (channeld->method));
+ grpc_metadata_batch_add_head (&op->data.metadata, &calld->scheme, GRPC_MDELEM_REF (channeld->scheme));
+ grpc_metadata_batch_add_tail (&op->data.metadata, &calld->te_trailers, GRPC_MDELEM_REF (channeld->te_trailers));
+ grpc_metadata_batch_add_tail (&op->data.metadata, &calld->content_type, GRPC_MDELEM_REF (channeld->content_type));
+ grpc_metadata_batch_add_tail (&op->data.metadata, &calld->user_agent, GRPC_MDELEM_REF (channeld->user_agent));
+ break;
+ }
}
- }
- if (op->recv_ops && !calld->got_initial_metadata) {
- /* substitute our callback for the higher callback */
- calld->recv_ops = op->recv_ops;
- calld->on_done_recv = op->on_done_recv;
- op->on_done_recv = &calld->hc_on_recv;
- }
+ if (op->recv_ops && !calld->got_initial_metadata)
+ {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ op->on_done_recv = &calld->hc_on_recv;
+ }
}
-static void hc_start_transport_op(grpc_call_element *elem,
- grpc_transport_stream_op *op,
- grpc_closure_list *closure_list) {
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- hc_mutate_op(elem, op);
- grpc_call_next_op(elem, op, closure_list);
+static void
+hc_start_transport_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list)
+{
+ GRPC_CALL_LOG_OP (GPR_INFO, elem, op);
+ hc_mutate_op (elem, op);
+ grpc_call_next_op (elem, op, closure_list);
}
/* Constructor for call_data */
-static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op,
- grpc_closure_list *closure_list) {
+static void
+init_call_elem (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list)
+{
call_data *calld = elem->call_data;
calld->sent_initial_metadata = 0;
calld->got_initial_metadata = 0;
calld->on_done_recv = NULL;
- grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
- if (initial_op) hc_mutate_op(elem, initial_op);
+ grpc_closure_init (&calld->hc_on_recv, hc_on_recv, elem);
+ if (initial_op)
+ hc_mutate_op (elem, initial_op);
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem,
- grpc_closure_list *closure_list) {}
+static void
+destroy_call_elem (grpc_call_element * elem, grpc_closure_list * closure_list)
+{
+}
-static const char *scheme_from_args(const grpc_channel_args *args) {
+static const char *
+scheme_from_args (const grpc_channel_args * args)
+{
unsigned i;
- if (args != NULL) {
- for (i = 0; i < args->num_args; ++i) {
- if (args->args[i].type == GRPC_ARG_STRING &&
- strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) {
- return args->args[i].value.string;
- }
+ if (args != NULL)
+ {
+ for (i = 0; i < args->num_args; ++i)
+ {
+ if (args->args[i].type == GRPC_ARG_STRING && strcmp (args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0)
+ {
+ return args->args[i].value.string;
+ }
+ }
}
- }
return "http";
}
-static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx,
- const grpc_channel_args *args) {
+static grpc_mdstr *
+user_agent_from_args (grpc_mdctx * mdctx, const grpc_channel_args * args)
+{
gpr_strvec v;
size_t i;
int is_first = 1;
char *tmp;
grpc_mdstr *result;
- gpr_strvec_init(&v);
+ gpr_strvec_init (&v);
- for (i = 0; args && i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) {
- if (args->args[i].type != GRPC_ARG_STRING) {
- gpr_log(GPR_ERROR, "Channel argument '%s' should be a string",
- GRPC_ARG_PRIMARY_USER_AGENT_STRING);
- } else {
- if (!is_first) gpr_strvec_add(&v, gpr_strdup(" "));
- is_first = 0;
- gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string));
- }
+ for (i = 0; args && i < args->num_args; i++)
+ {
+ if (0 == strcmp (args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING))
+ {
+ if (args->args[i].type != GRPC_ARG_STRING)
+ {
+ gpr_log (GPR_ERROR, "Channel argument '%s' should be a string", GRPC_ARG_PRIMARY_USER_AGENT_STRING);
+ }
+ else
+ {
+ if (!is_first)
+ gpr_strvec_add (&v, gpr_strdup (" "));
+ is_first = 0;
+ gpr_strvec_add (&v, gpr_strdup (args->args[i].value.string));
+ }
+ }
}
- }
- gpr_asprintf(&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ",
- grpc_version_string(), GPR_PLATFORM_STRING);
+ gpr_asprintf (&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ", grpc_version_string (), GPR_PLATFORM_STRING);
is_first = 0;
- gpr_strvec_add(&v, tmp);
+ gpr_strvec_add (&v, tmp);
- for (i = 0; args && i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) {
- if (args->args[i].type != GRPC_ARG_STRING) {
- gpr_log(GPR_ERROR, "Channel argument '%s' should be a string",
- GRPC_ARG_SECONDARY_USER_AGENT_STRING);
- } else {
- if (!is_first) gpr_strvec_add(&v, gpr_strdup(" "));
- is_first = 0;
- gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string));
- }
+ for (i = 0; args && i < args->num_args; i++)
+ {
+ if (0 == strcmp (args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING))
+ {
+ if (args->args[i].type != GRPC_ARG_STRING)
+ {
+ gpr_log (GPR_ERROR, "Channel argument '%s' should be a string", GRPC_ARG_SECONDARY_USER_AGENT_STRING);
+ }
+ else
+ {
+ if (!is_first)
+ gpr_strvec_add (&v, gpr_strdup (" "));
+ is_first = 0;
+ gpr_strvec_add (&v, gpr_strdup (args->args[i].value.string));
+ }
+ }
}
- }
- tmp = gpr_strvec_flatten(&v, NULL);
- gpr_strvec_destroy(&v);
- result = grpc_mdstr_from_string(mdctx, tmp, 0);
- gpr_free(tmp);
+ tmp = gpr_strvec_flatten (&v, NULL);
+ gpr_strvec_destroy (&v);
+ result = grpc_mdstr_from_string (mdctx, tmp, 0);
+ gpr_free (tmp);
return result;
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *channel_args,
- grpc_mdctx *mdctx, int is_first, int is_last,
- grpc_closure_list *closure_list) {
+static void
+init_channel_elem (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * channel_args, grpc_mdctx * mdctx, int is_first, int is_last, grpc_closure_list * closure_list)
+{
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
/* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down
path */
- GPR_ASSERT(!is_last);
+ GPR_ASSERT (!is_last);
/* initialize members */
- channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers");
- channeld->method = grpc_mdelem_from_strings(mdctx, ":method", "POST");
- channeld->scheme = grpc_mdelem_from_strings(mdctx, ":scheme",
- scheme_from_args(channel_args));
- channeld->content_type =
- grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
- channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200");
- channeld->user_agent = grpc_mdelem_from_metadata_strings(
- mdctx, grpc_mdstr_from_string(mdctx, "user-agent", 0),
- user_agent_from_args(mdctx, channel_args));
+ channeld->te_trailers = grpc_mdelem_from_strings (mdctx, "te", "trailers");
+ channeld->method = grpc_mdelem_from_strings (mdctx, ":method", "POST");
+ channeld->scheme = grpc_mdelem_from_strings (mdctx, ":scheme", scheme_from_args (channel_args));
+ channeld->content_type = grpc_mdelem_from_strings (mdctx, "content-type", "application/grpc");
+ channeld->status = grpc_mdelem_from_strings (mdctx, ":status", "200");
+ channeld->user_agent = grpc_mdelem_from_metadata_strings (mdctx, grpc_mdstr_from_string (mdctx, "user-agent", 0), user_agent_from_args (mdctx, channel_args));
}
/* Destructor for channel data */
-static void destroy_channel_elem(grpc_channel_element *elem,
- grpc_closure_list *closure_list) {
+static void
+destroy_channel_elem (grpc_channel_element * elem, grpc_closure_list * closure_list)
+{
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
- GRPC_MDELEM_UNREF(channeld->te_trailers);
- GRPC_MDELEM_UNREF(channeld->method);
- GRPC_MDELEM_UNREF(channeld->scheme);
- GRPC_MDELEM_UNREF(channeld->content_type);
- GRPC_MDELEM_UNREF(channeld->status);
- GRPC_MDELEM_UNREF(channeld->user_agent);
+ GRPC_MDELEM_UNREF (channeld->te_trailers);
+ GRPC_MDELEM_UNREF (channeld->method);
+ GRPC_MDELEM_UNREF (channeld->scheme);
+ GRPC_MDELEM_UNREF (channeld->content_type);
+ GRPC_MDELEM_UNREF (channeld->status);
+ GRPC_MDELEM_UNREF (channeld->user_agent);
}
const grpc_channel_filter grpc_http_client_filter = {
- hc_start_transport_op, grpc_channel_next_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
- "http-client"};
+ hc_start_transport_op, grpc_channel_next_op, sizeof (call_data),
+ init_call_elem, destroy_call_elem, sizeof (channel_data),
+ init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
+ "http-client"
+};
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index fb1f0b0554..949868b403 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -37,7 +37,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-typedef struct call_data {
+typedef struct call_data
+{
gpr_uint8 got_initial_metadata;
gpr_uint8 seen_path;
gpr_uint8 seen_post;
@@ -57,7 +58,8 @@ typedef struct call_data {
grpc_closure hs_on_recv;
} call_data;
-typedef struct channel_data {
+typedef struct channel_data
+{
grpc_mdelem *te_trailers;
grpc_mdelem *method_post;
grpc_mdelem *http_scheme;
@@ -74,236 +76,268 @@ typedef struct channel_data {
grpc_mdctx *mdctx;
} channel_data;
-typedef struct {
+typedef struct
+{
grpc_call_element *elem;
grpc_closure_list *closure_list;
} server_filter_args;
-static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *
+server_filter (void *user_data, grpc_mdelem * md)
+{
server_filter_args *a = user_data;
grpc_call_element *elem = a->elem;
channel_data *channeld = elem->channel_data;
call_data *calld = elem->call_data;
/* Check if it is one of the headers we care about. */
- if (md == channeld->te_trailers || md == channeld->method_post ||
- md == channeld->http_scheme || md == channeld->https_scheme ||
- md == channeld->grpc_scheme || md == channeld->content_type) {
- /* swallow it */
- if (md == channeld->method_post) {
- calld->seen_post = 1;
- } else if (md->key == channeld->http_scheme->key) {
- calld->seen_scheme = 1;
- } else if (md == channeld->te_trailers) {
- calld->seen_te_trailers = 1;
+ if (md == channeld->te_trailers || md == channeld->method_post || md == channeld->http_scheme || md == channeld->https_scheme || md == channeld->grpc_scheme || md == channeld->content_type)
+ {
+ /* swallow it */
+ if (md == channeld->method_post)
+ {
+ calld->seen_post = 1;
+ }
+ else if (md->key == channeld->http_scheme->key)
+ {
+ calld->seen_scheme = 1;
+ }
+ else if (md == channeld->te_trailers)
+ {
+ calld->seen_te_trailers = 1;
+ }
+ /* TODO(klempner): Track that we've seen all the headers we should
+ require */
+ return NULL;
}
- /* TODO(klempner): Track that we've seen all the headers we should
- require */
- return NULL;
- } else if (md->key == channeld->content_type->key) {
- if (strncmp(grpc_mdstr_as_c_string(md->value), "application/grpc+", 17) ==
- 0) {
- /* Although the C implementation doesn't (currently) generate them,
- any custom +-suffix is explicitly valid. */
- /* TODO(klempner): We should consider preallocating common values such
- as +proto or +json, or at least stashing them if we see them. */
- /* TODO(klempner): Should we be surfacing this to application code? */
- } else {
- /* TODO(klempner): We're currently allowing this, but we shouldn't
- see it without a proxy so log for now. */
- gpr_log(GPR_INFO, "Unexpected content-type %s",
- channeld->content_type->key);
+ else if (md->key == channeld->content_type->key)
+ {
+ if (strncmp (grpc_mdstr_as_c_string (md->value), "application/grpc+", 17) == 0)
+ {
+ /* Although the C implementation doesn't (currently) generate them,
+ any custom +-suffix is explicitly valid. */
+ /* TODO(klempner): We should consider preallocating common values such
+ as +proto or +json, or at least stashing them if we see them. */
+ /* TODO(klempner): Should we be surfacing this to application code? */
+ }
+ else
+ {
+ /* TODO(klempner): We're currently allowing this, but we shouldn't
+ see it without a proxy so log for now. */
+ gpr_log (GPR_INFO, "Unexpected content-type %s", channeld->content_type->key);
+ }
+ return NULL;
}
- return NULL;
- } else if (md->key == channeld->te_trailers->key ||
- md->key == channeld->method_post->key ||
- md->key == channeld->http_scheme->key) {
- gpr_log(GPR_ERROR, "Invalid %s: header: '%s'",
- grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value));
- /* swallow it and error everything out. */
- /* TODO(klempner): We ought to generate more descriptive error messages
- on the wire here. */
- grpc_call_element_send_cancel(elem, a->closure_list);
- return NULL;
- } else if (md->key == channeld->path_key) {
- if (calld->seen_path) {
- gpr_log(GPR_ERROR, "Received :path twice");
+ else if (md->key == channeld->te_trailers->key || md->key == channeld->method_post->key || md->key == channeld->http_scheme->key)
+ {
+ gpr_log (GPR_ERROR, "Invalid %s: header: '%s'", grpc_mdstr_as_c_string (md->key), grpc_mdstr_as_c_string (md->value));
+ /* swallow it and error everything out. */
+ /* TODO(klempner): We ought to generate more descriptive error messages
+ on the wire here. */
+ grpc_call_element_send_cancel (elem, a->closure_list);
return NULL;
}
- calld->seen_path = 1;
- return md;
- } else if (md->key == channeld->authority_key) {
- calld->seen_authority = 1;
- return md;
- } else if (md->key == channeld->host_key) {
- /* translate host to :authority since :authority may be
- omitted */
- grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
- channeld->mdctx, GRPC_MDSTR_REF(channeld->authority_key),
- GRPC_MDSTR_REF(md->value));
- GRPC_MDELEM_UNREF(md);
- calld->seen_authority = 1;
- return authority;
- } else {
- return md;
- }
+ else if (md->key == channeld->path_key)
+ {
+ if (calld->seen_path)
+ {
+ gpr_log (GPR_ERROR, "Received :path twice");
+ return NULL;
+ }
+ calld->seen_path = 1;
+ return md;
+ }
+ else if (md->key == channeld->authority_key)
+ {
+ calld->seen_authority = 1;
+ return md;
+ }
+ else if (md->key == channeld->host_key)
+ {
+ /* translate host to :authority since :authority may be
+ omitted */
+ grpc_mdelem *authority = grpc_mdelem_from_metadata_strings (channeld->mdctx, GRPC_MDSTR_REF (channeld->authority_key),
+ GRPC_MDSTR_REF (md->value));
+ GRPC_MDELEM_UNREF (md);
+ calld->seen_authority = 1;
+ return authority;
+ }
+ else
+ {
+ return md;
+ }
}
-static void hs_on_recv(void *user_data, int success,
- grpc_closure_list *closure_list) {
+static void
+hs_on_recv (void *user_data, int success, grpc_closure_list * closure_list)
+{
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- if (success) {
- size_t i;
- size_t nops = calld->recv_ops->nops;
- grpc_stream_op *ops = calld->recv_ops->ops;
- for (i = 0; i < nops; i++) {
- grpc_stream_op *op = &ops[i];
- server_filter_args a;
- if (op->type != GRPC_OP_METADATA) continue;
- calld->got_initial_metadata = 1;
- a.elem = elem;
- a.closure_list = closure_list;
- grpc_metadata_batch_filter(&op->data.metadata, server_filter, &a);
- /* Have we seen the required http2 transport headers?
- (:method, :scheme, content-type, with :path and :authority covered
- at the channel level right now) */
- if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
- calld->seen_path && calld->seen_authority) {
- /* do nothing */
- } else {
- if (!calld->seen_path) {
- gpr_log(GPR_ERROR, "Missing :path header");
- }
- if (!calld->seen_authority) {
- gpr_log(GPR_ERROR, "Missing :authority header");
- }
- if (!calld->seen_post) {
- gpr_log(GPR_ERROR, "Missing :method header");
- }
- if (!calld->seen_scheme) {
- gpr_log(GPR_ERROR, "Missing :scheme header");
- }
- if (!calld->seen_te_trailers) {
- gpr_log(GPR_ERROR, "Missing te trailers header");
- }
- /* Error this call out */
- success = 0;
- grpc_call_element_send_cancel(elem, closure_list);
- }
+ if (success)
+ {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++)
+ {
+ grpc_stream_op *op = &ops[i];
+ server_filter_args a;
+ if (op->type != GRPC_OP_METADATA)
+ continue;
+ calld->got_initial_metadata = 1;
+ a.elem = elem;
+ a.closure_list = closure_list;
+ grpc_metadata_batch_filter (&op->data.metadata, server_filter, &a);
+ /* Have we seen the required http2 transport headers?
+ (:method, :scheme, content-type, with :path and :authority covered
+ at the channel level right now) */
+ if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && calld->seen_path && calld->seen_authority)
+ {
+ /* do nothing */
+ }
+ else
+ {
+ if (!calld->seen_path)
+ {
+ gpr_log (GPR_ERROR, "Missing :path header");
+ }
+ if (!calld->seen_authority)
+ {
+ gpr_log (GPR_ERROR, "Missing :authority header");
+ }
+ if (!calld->seen_post)
+ {
+ gpr_log (GPR_ERROR, "Missing :method header");
+ }
+ if (!calld->seen_scheme)
+ {
+ gpr_log (GPR_ERROR, "Missing :scheme header");
+ }
+ if (!calld->seen_te_trailers)
+ {
+ gpr_log (GPR_ERROR, "Missing te trailers header");
+ }
+ /* Error this call out */
+ success = 0;
+ grpc_call_element_send_cancel (elem, closure_list);
+ }
+ }
}
- }
- calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success, closure_list);
+ calld->on_done_recv->cb (calld->on_done_recv->cb_arg, success, closure_list);
}
-static void hs_mutate_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void
+hs_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op)
+{
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
size_t i;
- if (op->send_ops && !calld->sent_status) {
- size_t nops = op->send_ops->nops;
- grpc_stream_op *ops = op->send_ops->ops;
- for (i = 0; i < nops; i++) {
- grpc_stream_op *op = &ops[i];
- if (op->type != GRPC_OP_METADATA) continue;
- calld->sent_status = 1;
- grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
- GRPC_MDELEM_REF(channeld->status_ok));
- grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
- GRPC_MDELEM_REF(channeld->content_type));
- break;
+ if (op->send_ops && !calld->sent_status)
+ {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++)
+ {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA)
+ continue;
+ calld->sent_status = 1;
+ grpc_metadata_batch_add_head (&op->data.metadata, &calld->status, GRPC_MDELEM_REF (channeld->status_ok));
+ grpc_metadata_batch_add_tail (&op->data.metadata, &calld->content_type, GRPC_MDELEM_REF (channeld->content_type));
+ break;
+ }
}
- }
- if (op->recv_ops && !calld->got_initial_metadata) {
- /* substitute our callback for the higher callback */
- calld->recv_ops = op->recv_ops;
- calld->on_done_recv = op->on_done_recv;
- op->on_done_recv = &calld->hs_on_recv;
- }
+ if (op->recv_ops && !calld->got_initial_metadata)
+ {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ op->on_done_recv = &calld->hs_on_recv;
+ }
}
-static void hs_start_transport_op(grpc_call_element *elem,
- grpc_transport_stream_op *op,
- grpc_closure_list *closure_list) {
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- hs_mutate_op(elem, op);
- grpc_call_next_op(elem, op, closure_list);
+static void
+hs_start_transport_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list)
+{
+ GRPC_CALL_LOG_OP (GPR_INFO, elem, op);
+ hs_mutate_op (elem, op);
+ grpc_call_next_op (elem, op, closure_list);
}
/* Constructor for call_data */
-static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op,
- grpc_closure_list *closure_list) {
+static void
+init_call_elem (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list)
+{
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
- memset(calld, 0, sizeof(*calld));
- grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
- if (initial_op) hs_mutate_op(elem, initial_op);
+ memset (calld, 0, sizeof (*calld));
+ grpc_closure_init (&calld->hs_on_recv, hs_on_recv, elem);
+ if (initial_op)
+ hs_mutate_op (elem, initial_op);
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem,
- grpc_closure_list *closure_list) {}
+static void
+destroy_call_elem (grpc_call_element * elem, grpc_closure_list * closure_list)
+{
+}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last,
- grpc_closure_list *closure_list) {
+static void
+init_channel_elem (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last, grpc_closure_list * closure_list)
+{
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
/* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down
path */
- GPR_ASSERT(!is_first);
- GPR_ASSERT(!is_last);
+ GPR_ASSERT (!is_first);
+ GPR_ASSERT (!is_last);
/* initialize members */
- channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers");
- channeld->status_ok = grpc_mdelem_from_strings(mdctx, ":status", "200");
- channeld->status_not_found =
- grpc_mdelem_from_strings(mdctx, ":status", "404");
- channeld->method_post = grpc_mdelem_from_strings(mdctx, ":method", "POST");
- channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http");
- channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https");
- channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc");
- channeld->path_key = grpc_mdstr_from_string(mdctx, ":path", 0);
- channeld->authority_key = grpc_mdstr_from_string(mdctx, ":authority", 0);
- channeld->host_key = grpc_mdstr_from_string(mdctx, "host", 0);
- channeld->content_type =
- grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
+ channeld->te_trailers = grpc_mdelem_from_strings (mdctx, "te", "trailers");
+ channeld->status_ok = grpc_mdelem_from_strings (mdctx, ":status", "200");
+ channeld->status_not_found = grpc_mdelem_from_strings (mdctx, ":status", "404");
+ channeld->method_post = grpc_mdelem_from_strings (mdctx, ":method", "POST");
+ channeld->http_scheme = grpc_mdelem_from_strings (mdctx, ":scheme", "http");
+ channeld->https_scheme = grpc_mdelem_from_strings (mdctx, ":scheme", "https");
+ channeld->grpc_scheme = grpc_mdelem_from_strings (mdctx, ":scheme", "grpc");
+ channeld->path_key = grpc_mdstr_from_string (mdctx, ":path", 0);
+ channeld->authority_key = grpc_mdstr_from_string (mdctx, ":authority", 0);
+ channeld->host_key = grpc_mdstr_from_string (mdctx, "host", 0);
+ channeld->content_type = grpc_mdelem_from_strings (mdctx, "content-type", "application/grpc");
channeld->mdctx = mdctx;
}
/* Destructor for channel data */
-static void destroy_channel_elem(grpc_channel_element *elem,
- grpc_closure_list *closure_list) {
+static void
+destroy_channel_elem (grpc_channel_element * elem, grpc_closure_list * closure_list)
+{
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
- GRPC_MDELEM_UNREF(channeld->te_trailers);
- GRPC_MDELEM_UNREF(channeld->status_ok);
- GRPC_MDELEM_UNREF(channeld->status_not_found);
- GRPC_MDELEM_UNREF(channeld->method_post);
- GRPC_MDELEM_UNREF(channeld->http_scheme);
- GRPC_MDELEM_UNREF(channeld->https_scheme);
- GRPC_MDELEM_UNREF(channeld->grpc_scheme);
- GRPC_MDELEM_UNREF(channeld->content_type);
- GRPC_MDSTR_UNREF(channeld->path_key);
- GRPC_MDSTR_UNREF(channeld->authority_key);
- GRPC_MDSTR_UNREF(channeld->host_key);
+ GRPC_MDELEM_UNREF (channeld->te_trailers);
+ GRPC_MDELEM_UNREF (channeld->status_ok);
+ GRPC_MDELEM_UNREF (channeld->status_not_found);
+ GRPC_MDELEM_UNREF (channeld->method_post);
+ GRPC_MDELEM_UNREF (channeld->http_scheme);
+ GRPC_MDELEM_UNREF (channeld->https_scheme);
+ GRPC_MDELEM_UNREF (channeld->grpc_scheme);
+ GRPC_MDELEM_UNREF (channeld->content_type);
+ GRPC_MDSTR_UNREF (channeld->path_key);
+ GRPC_MDSTR_UNREF (channeld->authority_key);
+ GRPC_MDSTR_UNREF (channeld->host_key);
}
const grpc_channel_filter grpc_http_server_filter = {
- hs_start_transport_op, grpc_channel_next_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
- "http-server"};
+ hs_start_transport_op, grpc_channel_next_op, sizeof (call_data),
+ init_call_elem, destroy_call_elem, sizeof (channel_data),
+ init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
+ "http-server"
+};
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index e4f6c7f837..a93497c509 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -34,25 +34,31 @@
#include "src/core/channel/noop_filter.h"
#include <grpc/support/log.h>
-typedef struct call_data {
- int unused; /* C89 requires at least one struct element */
+typedef struct call_data
+{
+ int unused; /* C89 requires at least one struct element */
} call_data;
-typedef struct channel_data {
- int unused; /* C89 requires at least one struct element */
+typedef struct channel_data
+{
+ int unused; /* C89 requires at least one struct element */
} channel_data;
/* used to silence 'variable not used' warnings */
-static void ignore_unused(void *ignored) {}
+static void
+ignore_unused (void *ignored)
+{
+}
-static void noop_mutate_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void
+noop_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op)
+{
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
- ignore_unused(calld);
- ignore_unused(channeld);
+ ignore_unused (calld);
+ ignore_unused (channeld);
/* do nothing */
}
@@ -62,20 +68,19 @@ static void noop_mutate_op(grpc_call_element *elem,
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
-static void noop_start_transport_stream_op(grpc_call_element *elem,
- grpc_transport_stream_op *op,
- grpc_closure_list *closure_list) {
- noop_mutate_op(elem, op);
+static void
+noop_start_transport_stream_op (grpc_call_element * elem, grpc_transport_stream_op * op, grpc_closure_list * closure_list)
+{
+ noop_mutate_op (elem, op);
/* pass control down the stack */
- grpc_call_next_op(elem, op, closure_list);
+ grpc_call_next_op (elem, op, closure_list);
}
/* Constructor for call_data */
-static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op,
- grpc_closure_list *closure_list) {
+static void
+init_call_elem (grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op, grpc_closure_list * closure_list)
+{
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -83,47 +88,51 @@ static void init_call_elem(grpc_call_element *elem,
/* initialize members */
calld->unused = channeld->unused;
- if (initial_op) noop_mutate_op(elem, initial_op);
+ if (initial_op)
+ noop_mutate_op (elem, initial_op);
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem,
- grpc_closure_list *closure_list) {}
+static void
+destroy_call_elem (grpc_call_element * elem, grpc_closure_list * closure_list)
+{
+}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last,
- grpc_closure_list *closure_list) {
+static void
+init_channel_elem (grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last, grpc_closure_list * closure_list)
+{
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
/* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down
path */
- GPR_ASSERT(!is_first);
- GPR_ASSERT(!is_last);
+ GPR_ASSERT (!is_first);
+ GPR_ASSERT (!is_last);
/* initialize members */
channeld->unused = 0;
}
/* Destructor for channel data */
-static void destroy_channel_elem(grpc_channel_element *elem,
- grpc_closure_list *closure_list) {
+static void
+destroy_channel_elem (grpc_channel_element * elem, grpc_closure_list * closure_list)
+{
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
- ignore_unused(channeld);
+ ignore_unused (channeld);
}
-const grpc_channel_filter grpc_no_op_filter = {noop_start_transport_stream_op,
- grpc_channel_next_op,
- sizeof(call_data),
- init_call_elem,
- destroy_call_elem,
- sizeof(channel_data),
- init_channel_elem,
- destroy_channel_elem,
- grpc_call_next_get_peer,
- "no-op"};
+const grpc_channel_filter grpc_no_op_filter = { noop_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof (call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof (channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "no-op"
+};