aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/channel_args.c251
-rw-r--r--src/core/channel/channel_args.h25
-rw-r--r--src/core/channel/channel_stack.c187
-rw-r--r--src/core/channel/channel_stack.h88
-rw-r--r--src/core/channel/client_channel.c1031
-rw-r--r--src/core/channel/client_channel.h22
-rw-r--r--src/core/channel/compress_filter.c451
-rw-r--r--src/core/channel/connected_channel.c118
-rw-r--r--src/core/channel/connected_channel.h3
-rw-r--r--src/core/channel/context.h8
-rw-r--r--src/core/channel/http_client_filter.c313
-rw-r--r--src/core/channel/http_server_filter.c382
-rw-r--r--src/core/channel/noop_filter.c88
13 files changed, 1408 insertions, 1559 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c
index 4207a97114..487db1119a 100644
--- a/src/core/channel/channel_args.c
+++ b/src/core/channel/channel_args.c
@@ -41,206 +41,169 @@
#include <string.h>
-static grpc_arg
-copy_arg (const grpc_arg * src)
-{
+static grpc_arg copy_arg(const grpc_arg *src) {
grpc_arg dst;
dst.type = src->type;
- dst.key = gpr_strdup (src->key);
- switch (dst.type)
- {
+ dst.key = gpr_strdup(src->key);
+ switch (dst.type) {
case GRPC_ARG_STRING:
- dst.value.string = gpr_strdup (src->value.string);
+ dst.value.string = gpr_strdup(src->value.string);
break;
case GRPC_ARG_INTEGER:
dst.value.integer = src->value.integer;
break;
case GRPC_ARG_POINTER:
dst.value.pointer = src->value.pointer;
- dst.value.pointer.p = src->value.pointer.copy ? src->value.pointer.copy (src->value.pointer.p) : src->value.pointer.p;
+ dst.value.pointer.p = src->value.pointer.copy
+ ? src->value.pointer.copy(src->value.pointer.p)
+ : src->value.pointer.p;
break;
- }
+ }
return dst;
}
-grpc_channel_args *
-grpc_channel_args_copy_and_add (const grpc_channel_args * src, const grpc_arg * to_add, size_t num_to_add)
-{
- grpc_channel_args *dst = gpr_malloc (sizeof (grpc_channel_args));
+grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
+ const grpc_arg *to_add,
+ size_t num_to_add) {
+ grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args));
size_t i;
size_t src_num_args = (src == NULL) ? 0 : src->num_args;
- if (!src && !to_add)
- {
- dst->num_args = 0;
- dst->args = NULL;
- return dst;
- }
+ if (!src && !to_add) {
+ dst->num_args = 0;
+ dst->args = NULL;
+ return dst;
+ }
dst->num_args = src_num_args + num_to_add;
- dst->args = gpr_malloc (sizeof (grpc_arg) * dst->num_args);
- for (i = 0; i < src_num_args; i++)
- {
- dst->args[i] = copy_arg (&src->args[i]);
- }
- for (i = 0; i < num_to_add; i++)
- {
- dst->args[i + src_num_args] = copy_arg (&to_add[i]);
- }
+ dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args);
+ for (i = 0; i < src_num_args; i++) {
+ dst->args[i] = copy_arg(&src->args[i]);
+ }
+ for (i = 0; i < num_to_add; i++) {
+ dst->args[i + src_num_args] = copy_arg(&to_add[i]);
+ }
return dst;
}
-grpc_channel_args *
-grpc_channel_args_copy (const grpc_channel_args * src)
-{
- return grpc_channel_args_copy_and_add (src, NULL, 0);
+grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src) {
+ return grpc_channel_args_copy_and_add(src, NULL, 0);
}
-grpc_channel_args *
-grpc_channel_args_merge (const grpc_channel_args * a, const grpc_channel_args * b)
-{
- return grpc_channel_args_copy_and_add (a, b->args, b->num_args);
+grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
+ const grpc_channel_args *b) {
+ return grpc_channel_args_copy_and_add(a, b->args, b->num_args);
}
-void
-grpc_channel_args_destroy (grpc_channel_args * a)
-{
+void grpc_channel_args_destroy(grpc_channel_args *a) {
size_t i;
- for (i = 0; i < a->num_args; i++)
- {
- switch (a->args[i].type)
- {
- case GRPC_ARG_STRING:
- gpr_free (a->args[i].value.string);
- break;
- case GRPC_ARG_INTEGER:
- break;
- case GRPC_ARG_POINTER:
- if (a->args[i].value.pointer.destroy)
- {
- a->args[i].value.pointer.destroy (a->args[i].value.pointer.p);
- }
- break;
- }
- gpr_free (a->args[i].key);
+ for (i = 0; i < a->num_args; i++) {
+ switch (a->args[i].type) {
+ case GRPC_ARG_STRING:
+ gpr_free(a->args[i].value.string);
+ break;
+ case GRPC_ARG_INTEGER:
+ break;
+ case GRPC_ARG_POINTER:
+ if (a->args[i].value.pointer.destroy) {
+ a->args[i].value.pointer.destroy(a->args[i].value.pointer.p);
+ }
+ break;
}
- gpr_free (a->args);
- gpr_free (a);
+ gpr_free(a->args[i].key);
+ }
+ gpr_free(a->args);
+ gpr_free(a);
}
-int
-grpc_channel_args_is_census_enabled (const grpc_channel_args * a)
-{
+int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) {
size_t i;
- if (a == NULL)
- return 0;
- for (i = 0; i < a->num_args; i++)
- {
- if (0 == strcmp (a->args[i].key, GRPC_ARG_ENABLE_CENSUS))
- {
- return a->args[i].value.integer != 0;
- }
+ if (a == NULL) return 0;
+ for (i = 0; i < a->num_args; i++) {
+ if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) {
+ return a->args[i].value.integer != 0;
}
+ }
return 0;
}
-grpc_compression_algorithm
-grpc_channel_args_get_compression_algorithm (const grpc_channel_args * a)
-{
+grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
+ const grpc_channel_args *a) {
size_t i;
- if (a == NULL)
- return 0;
- for (i = 0; i < a->num_args; ++i)
- {
- if (a->args[i].type == GRPC_ARG_INTEGER && !strcmp (GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key))
- {
- return (grpc_compression_algorithm) a->args[i].value.integer;
- break;
- }
+ if (a == NULL) return 0;
+ for (i = 0; i < a->num_args; ++i) {
+ if (a->args[i].type == GRPC_ARG_INTEGER &&
+ !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) {
+ return (grpc_compression_algorithm)a->args[i].value.integer;
+ break;
}
+ }
return GRPC_COMPRESS_NONE;
}
-grpc_channel_args *
-grpc_channel_args_set_compression_algorithm (grpc_channel_args * a, grpc_compression_algorithm algorithm)
-{
+grpc_channel_args *grpc_channel_args_set_compression_algorithm(
+ grpc_channel_args *a, grpc_compression_algorithm algorithm) {
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
tmp.key = GRPC_COMPRESSION_ALGORITHM_ARG;
tmp.value.integer = algorithm;
- return grpc_channel_args_copy_and_add (a, &tmp, 1);
+ return grpc_channel_args_copy_and_add(a, &tmp, 1);
}
/** Returns 1 if the argument for compression algorithm's enabled states bitset
* was found in \a a, returning the arg's value in \a states. Otherwise, returns
* 0. */
-static int
-find_compression_algorithm_states_bitset (const grpc_channel_args * a, int **states_arg)
-{
- if (a != NULL)
- {
- size_t i;
- for (i = 0; i < a->num_args; ++i)
- {
- if (a->args[i].type == GRPC_ARG_INTEGER && !strcmp (GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key))
- {
- *states_arg = &a->args[i].value.integer;
- return 1; /* GPR_TRUE */
- }
- }
+static int find_compression_algorithm_states_bitset(const grpc_channel_args *a,
+ int **states_arg) {
+ if (a != NULL) {
+ size_t i;
+ for (i = 0; i < a->num_args; ++i) {
+ if (a->args[i].type == GRPC_ARG_INTEGER &&
+ !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) {
+ *states_arg = &a->args[i].value.integer;
+ return 1; /* GPR_TRUE */
+ }
}
- return 0; /* GPR_FALSE */
+ }
+ return 0; /* GPR_FALSE */
}
-grpc_channel_args *
-grpc_channel_args_compression_algorithm_set_state (grpc_channel_args ** a, grpc_compression_algorithm algorithm, int state)
-{
+grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
+ grpc_channel_args **a, grpc_compression_algorithm algorithm, int state) {
int *states_arg;
grpc_channel_args *result = *a;
- const int states_arg_found = find_compression_algorithm_states_bitset (*a, &states_arg);
+ const int states_arg_found =
+ find_compression_algorithm_states_bitset(*a, &states_arg);
- if (states_arg_found)
- {
- if (state != 0)
- {
- GPR_BITSET ((unsigned *) states_arg, algorithm);
- }
- else
- {
- GPR_BITCLEAR ((unsigned *) states_arg, algorithm);
- }
+ if (states_arg_found) {
+ if (state != 0) {
+ GPR_BITSET((unsigned *)states_arg, algorithm);
+ } else {
+ GPR_BITCLEAR((unsigned *)states_arg, algorithm);
}
- else
- {
- /* create a new arg */
- grpc_arg tmp;
- tmp.type = GRPC_ARG_INTEGER;
- tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
- /* all enabled by default */
- tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
- if (state != 0)
- {
- GPR_BITSET ((unsigned *) &tmp.value.integer, algorithm);
- }
- else
- {
- GPR_BITCLEAR ((unsigned *) &tmp.value.integer, algorithm);
- }
- result = grpc_channel_args_copy_and_add (*a, &tmp, 1);
- grpc_channel_args_destroy (*a);
- *a = result;
+ } else {
+ /* create a new arg */
+ grpc_arg tmp;
+ tmp.type = GRPC_ARG_INTEGER;
+ tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
+ /* all enabled by default */
+ tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
+ if (state != 0) {
+ GPR_BITSET((unsigned *)&tmp.value.integer, algorithm);
+ } else {
+ GPR_BITCLEAR((unsigned *)&tmp.value.integer, algorithm);
}
+ result = grpc_channel_args_copy_and_add(*a, &tmp, 1);
+ grpc_channel_args_destroy(*a);
+ *a = result;
+ }
return result;
}
-int
-grpc_channel_args_compression_algorithm_get_states (const grpc_channel_args * a)
-{
+int grpc_channel_args_compression_algorithm_get_states(
+ const grpc_channel_args *a) {
int *states_arg;
- if (find_compression_algorithm_states_bitset (a, &states_arg))
- {
- return *states_arg;
- }
- else
- {
- return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */
- }
+ if (find_compression_algorithm_states_bitset(a, &states_arg)) {
+ return *states_arg;
+ } else {
+ return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */
+ }
}
diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h
index bcfd8c46a0..f9e7b05860 100644
--- a/src/core/channel/channel_args.h
+++ b/src/core/channel/channel_args.h
@@ -38,29 +38,34 @@
#include <grpc/grpc.h>
/* Copy some arguments */
-grpc_channel_args *grpc_channel_args_copy (const grpc_channel_args * src);
+grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src);
/** Copy some arguments and add the to_add parameter in the end.
If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */
-grpc_channel_args *grpc_channel_args_copy_and_add (const grpc_channel_args * src, const grpc_arg * to_add, size_t num_to_add);
+grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
+ const grpc_arg *to_add,
+ size_t num_to_add);
/** Copy args from a then args from b into a new channel args */
-grpc_channel_args *grpc_channel_args_merge (const grpc_channel_args * a, const grpc_channel_args * b);
+grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
+ const grpc_channel_args *b);
/** Destroy arguments created by grpc_channel_args_copy */
-void grpc_channel_args_destroy (grpc_channel_args * a);
+void grpc_channel_args_destroy(grpc_channel_args *a);
/** Reads census_enabled settings from channel args. Returns 1 if census_enabled
* is specified in channel args, otherwise returns 0. */
-int grpc_channel_args_is_census_enabled (const grpc_channel_args * a);
+int grpc_channel_args_is_census_enabled(const grpc_channel_args *a);
/** Returns the compression algorithm set in \a a. */
-grpc_compression_algorithm grpc_channel_args_get_compression_algorithm (const grpc_channel_args * a);
+grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
+ const grpc_channel_args *a);
/** Returns a channel arg instance with compression enabled. If \a a is
* non-NULL, its args are copied. N.B. GRPC_COMPRESS_NONE disables compression
* for the channel. */
-grpc_channel_args *grpc_channel_args_set_compression_algorithm (grpc_channel_args * a, grpc_compression_algorithm algorithm);
+grpc_channel_args *grpc_channel_args_set_compression_algorithm(
+ grpc_channel_args *a, grpc_compression_algorithm algorithm);
/** Sets the support for the given compression algorithm. By default, all
* compression algorithms are enabled. It's an error to disable an algorithm set
@@ -69,13 +74,15 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm (grpc_channel_arg
* Returns an instance will the updated algorithm states. The \a a pointer is
* modified to point to the returned instance (which may be different from the
* input value of \a a). */
-grpc_channel_args *grpc_channel_args_compression_algorithm_set_state (grpc_channel_args ** a, grpc_compression_algorithm algorithm, int enabled);
+grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
+ grpc_channel_args **a, grpc_compression_algorithm algorithm, int enabled);
/** Returns the bitset representing the support state (true for enabled, false
* for disabled) for compression algorithms.
*
* The i-th bit of the returned bitset corresponds to the i-th entry in the
* grpc_compression_algorithm enum. */
-int grpc_channel_args_compression_algorithm_get_states (const grpc_channel_args * a);
+int grpc_channel_args_compression_algorithm_get_states(
+ const grpc_channel_args *a);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 67ab2c535f..abd7f719e7 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -59,20 +59,21 @@ int grpc_trace_channel = 0;
#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \
(((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u))
-size_t
-grpc_channel_stack_size (const grpc_channel_filter ** filters, size_t filter_count)
-{
+size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
+ size_t filter_count) {
/* always need the header, and size for the channel elements */
- size_t size = ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_channel_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE (filter_count * sizeof (grpc_channel_element));
+ size_t size =
+ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack)) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element));
size_t i;
- GPR_ASSERT ((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0 && "GPR_MAX_ALIGNMENT must be a power of two");
+ GPR_ASSERT((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0 &&
+ "GPR_MAX_ALIGNMENT must be a power of two");
/* add the size for each filter */
- for (i = 0; i < filter_count; i++)
- {
- size += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_channel_data);
- }
+ for (i = 0; i < filter_count; i++) {
+ size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
+ }
return size;
}
@@ -85,142 +86,142 @@ grpc_channel_stack_size (const grpc_channel_filter ** filters, size_t filter_cou
((grpc_call_element *)((char *)(stk) + \
ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack))))
-grpc_channel_element *
-grpc_channel_stack_element (grpc_channel_stack * channel_stack, size_t index)
-{
- return CHANNEL_ELEMS_FROM_STACK (channel_stack) + index;
+grpc_channel_element *grpc_channel_stack_element(
+ grpc_channel_stack *channel_stack, size_t index) {
+ return CHANNEL_ELEMS_FROM_STACK(channel_stack) + index;
}
-grpc_channel_element *
-grpc_channel_stack_last_element (grpc_channel_stack * channel_stack)
-{
- return grpc_channel_stack_element (channel_stack, channel_stack->count - 1);
+grpc_channel_element *grpc_channel_stack_last_element(
+ grpc_channel_stack *channel_stack) {
+ return grpc_channel_stack_element(channel_stack, channel_stack->count - 1);
}
-grpc_call_element *
-grpc_call_stack_element (grpc_call_stack * call_stack, size_t index)
-{
- return CALL_ELEMS_FROM_STACK (call_stack) + index;
+grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack,
+ size_t index) {
+ return CALL_ELEMS_FROM_STACK(call_stack) + index;
}
-void
-grpc_channel_stack_init (grpc_exec_ctx * exec_ctx, const grpc_channel_filter ** filters, size_t filter_count, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, grpc_channel_stack * stack)
-{
- size_t call_size = ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_call_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE (filter_count * sizeof (grpc_call_element));
+void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
+ const grpc_channel_filter **filters,
+ size_t filter_count, grpc_channel *master,
+ const grpc_channel_args *args,
+ grpc_mdctx *metadata_context,
+ grpc_channel_stack *stack) {
+ size_t call_size =
+ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));
grpc_channel_element *elems;
char *user_data;
size_t i;
stack->count = filter_count;
- elems = CHANNEL_ELEMS_FROM_STACK (stack);
- user_data = ((char *) elems) + ROUND_UP_TO_ALIGNMENT_SIZE (filter_count * sizeof (grpc_channel_element));
+ elems = CHANNEL_ELEMS_FROM_STACK(stack);
+ user_data =
+ ((char *)elems) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element));
/* init per-filter data */
- for (i = 0; i < filter_count; i++)
- {
- elems[i].filter = filters[i];
- elems[i].channel_data = user_data;
- elems[i].filter->init_channel_elem (exec_ctx, &elems[i], master, args, metadata_context, i == 0, i == (filter_count - 1));
- user_data += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_channel_data);
- call_size += ROUND_UP_TO_ALIGNMENT_SIZE (filters[i]->sizeof_call_data);
- }
-
- GPR_ASSERT (user_data > (char *) stack);
- GPR_ASSERT ((gpr_uintptr) (user_data - (char *) stack) == grpc_channel_stack_size (filters, filter_count));
+ for (i = 0; i < filter_count; i++) {
+ elems[i].filter = filters[i];
+ elems[i].channel_data = user_data;
+ elems[i].filter->init_channel_elem(exec_ctx, &elems[i], master, args,
+ metadata_context, i == 0,
+ i == (filter_count - 1));
+ user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
+ call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data);
+ }
+
+ GPR_ASSERT(user_data > (char *)stack);
+ GPR_ASSERT((gpr_uintptr)(user_data - (char *)stack) ==
+ grpc_channel_stack_size(filters, filter_count));
stack->call_stack_size = call_size;
}
-void
-grpc_channel_stack_destroy (grpc_exec_ctx * exec_ctx, grpc_channel_stack * stack)
-{
- grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK (stack);
+void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *stack) {
+ grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(stack);
size_t count = stack->count;
size_t i;
/* destroy per-filter data */
- for (i = 0; i < count; i++)
- {
- channel_elems[i].filter->destroy_channel_elem (exec_ctx, &channel_elems[i]);
- }
+ for (i = 0; i < count; i++) {
+ channel_elems[i].filter->destroy_channel_elem(exec_ctx, &channel_elems[i]);
+ }
}
-void
-grpc_call_stack_init (grpc_exec_ctx * exec_ctx, grpc_channel_stack * channel_stack, const void *transport_server_data, grpc_transport_stream_op * initial_op, grpc_call_stack * call_stack)
-{
- grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK (channel_stack);
+void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack,
+ const void *transport_server_data,
+ grpc_transport_stream_op *initial_op,
+ grpc_call_stack *call_stack) {
+ grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
size_t count = channel_stack->count;
grpc_call_element *call_elems;
char *user_data;
size_t i;
call_stack->count = count;
- call_elems = CALL_ELEMS_FROM_STACK (call_stack);
- user_data = ((char *) call_elems) + ROUND_UP_TO_ALIGNMENT_SIZE (count * sizeof (grpc_call_element));
+ call_elems = CALL_ELEMS_FROM_STACK(call_stack);
+ user_data = ((char *)call_elems) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
/* init per-filter data */
- for (i = 0; i < count; i++)
- {
- call_elems[i].filter = channel_elems[i].filter;
- call_elems[i].channel_data = channel_elems[i].channel_data;
- call_elems[i].call_data = user_data;
- call_elems[i].filter->init_call_elem (exec_ctx, &call_elems[i], transport_server_data, initial_op);
- user_data += ROUND_UP_TO_ALIGNMENT_SIZE (call_elems[i].filter->sizeof_call_data);
- }
+ for (i = 0; i < count; i++) {
+ call_elems[i].filter = channel_elems[i].filter;
+ call_elems[i].channel_data = channel_elems[i].channel_data;
+ call_elems[i].call_data = user_data;
+ call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i],
+ transport_server_data, initial_op);
+ user_data +=
+ ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
+ }
}
-void
-grpc_call_stack_destroy (grpc_exec_ctx * exec_ctx, grpc_call_stack * stack)
-{
- grpc_call_element *elems = CALL_ELEMS_FROM_STACK (stack);
+void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) {
+ grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
size_t count = stack->count;
size_t i;
/* destroy per-filter data */
- for (i = 0; i < count; i++)
- {
- elems[i].filter->destroy_call_elem (exec_ctx, &elems[i]);
- }
+ for (i = 0; i < count; i++) {
+ elems[i].filter->destroy_call_elem(exec_ctx, &elems[i]);
+ }
}
-void
-grpc_call_next_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op)
-{
+void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
grpc_call_element *next_elem = elem + 1;
- next_elem->filter->start_transport_stream_op (exec_ctx, next_elem, op);
+ next_elem->filter->start_transport_stream_op(exec_ctx, next_elem, op);
}
-char *
-grpc_call_next_get_peer (grpc_exec_ctx * exec_ctx, grpc_call_element * elem)
-{
+char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
grpc_call_element *next_elem = elem + 1;
- return next_elem->filter->get_peer (exec_ctx, next_elem);
+ return next_elem->filter->get_peer(exec_ctx, next_elem);
}
-void
-grpc_channel_next_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op)
-{
+void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_transport_op *op) {
grpc_channel_element *next_elem = elem + 1;
- next_elem->filter->start_transport_op (exec_ctx, next_elem, op);
+ next_elem->filter->start_transport_op(exec_ctx, next_elem, op);
}
-grpc_channel_stack *
-grpc_channel_stack_from_top_element (grpc_channel_element * elem)
-{
- return (grpc_channel_stack *) ((char *) (elem) - ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_channel_stack)));
+grpc_channel_stack *grpc_channel_stack_from_top_element(
+ grpc_channel_element *elem) {
+ return (grpc_channel_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
+ sizeof(grpc_channel_stack)));
}
-grpc_call_stack *
-grpc_call_stack_from_top_element (grpc_call_element * elem)
-{
- return (grpc_call_stack *) ((char *) (elem) - ROUND_UP_TO_ALIGNMENT_SIZE (sizeof (grpc_call_stack)));
+grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
+ return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
+ sizeof(grpc_call_stack)));
}
-void
-grpc_call_element_send_cancel (grpc_exec_ctx * exec_ctx, grpc_call_element * cur_elem)
-{
+void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *cur_elem) {
grpc_transport_stream_op op;
- memset (&op, 0, sizeof (op));
+ memset(&op, 0, sizeof(op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
- grpc_call_next_op (exec_ctx, cur_elem, &op);
+ grpc_call_next_op(exec_ctx, cur_elem, &op);
}
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 0bfe326976..6732cc3018 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -61,15 +61,17 @@ typedef struct grpc_call_element grpc_call_element;
4. a name, which is useful when debugging
Members are laid out in approximate frequency of use order. */
-typedef struct
-{
+typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
- void (*start_transport_stream_op) (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op);
+ void (*start_transport_stream_op)(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
- void (*start_transport_op) (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op);
+ void (*start_transport_op)(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem, grpc_transport_op *op);
/* sizeof(per call data) */
size_t sizeof_call_data;
@@ -81,10 +83,12 @@ typedef struct
on a client; if it is non-NULL, then it points to memory owned by the
transport and is on the server. Most filters want to ignore this
argument. */
- void (*init_call_elem) (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op);
+ void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ const void *server_transport_data,
+ grpc_transport_stream_op *initial_op);
/* Destroy per call data.
The filter does not need to do any chaining */
- void (*destroy_call_elem) (grpc_exec_ctx * exec_ctx, grpc_call_element * elem);
+ void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
/* sizeof(per channel data) */
size_t sizeof_channel_data;
@@ -94,13 +98,17 @@ typedef struct
is_first, is_last designate this elements position in the stack, and are
useful for asserting correct configuration by upper layer code.
The filter does not need to do any chaining */
- void (*init_channel_elem) (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, int is_first, int is_last);
+ void (*init_channel_elem)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_channel *master, const grpc_channel_args *args,
+ grpc_mdctx *metadata_context, int is_first,
+ int is_last);
/* Destroy per channel data.
The filter does not need to do any chaining */
- void (*destroy_channel_elem) (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem);
+ void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem);
/* Implement grpc_call_get_peer() */
- char *(*get_peer) (grpc_exec_ctx * exec_ctx, grpc_call_element * elem);
+ char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
/* The name of this filter */
const char *name;
@@ -108,8 +116,7 @@ typedef struct
/* A channel_element tracks its filter and the filter requested memory within
a channel allocation */
-struct grpc_channel_element
-{
+struct grpc_channel_element {
const grpc_channel_filter *filter;
void *channel_data;
};
@@ -117,8 +124,7 @@ struct grpc_channel_element
/* A call_element tracks its filter, the filter requested memory within
a channel allocation, and the filter requested memory within a call
allocation */
-struct grpc_call_element
-{
+struct grpc_call_element {
const grpc_channel_filter *filter;
void *channel_data;
void *call_data;
@@ -126,8 +132,7 @@ struct grpc_call_element
/* A channel stack tracks a set of related filters for one channel, and
guarantees they live within a single malloc() allocation */
-typedef struct
-{
+typedef struct {
size_t count;
/* Memory required for a call stack (computed at channel stack
initialization) */
@@ -136,48 +141,63 @@ typedef struct
/* A call stack tracks a set of related filters for one call, and guarantees
they live within a single malloc() allocation */
-typedef struct
-{
- size_t count;
-} grpc_call_stack;
+typedef struct { size_t count; } grpc_call_stack;
/* Get a channel element given a channel stack and its index */
-grpc_channel_element *grpc_channel_stack_element (grpc_channel_stack * stack, size_t i);
+grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack,
+ size_t i);
/* Get the last channel element in a channel stack */
-grpc_channel_element *grpc_channel_stack_last_element (grpc_channel_stack * stack);
+grpc_channel_element *grpc_channel_stack_last_element(
+ grpc_channel_stack *stack);
/* Get a call stack element given a call stack and an index */
-grpc_call_element *grpc_call_stack_element (grpc_call_stack * stack, size_t i);
+grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i);
/* Determine memory required for a channel stack containing a set of filters */
-size_t grpc_channel_stack_size (const grpc_channel_filter ** filters, size_t filter_count);
+size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
+ size_t filter_count);
/* Initialize a channel stack given some filters */
-void grpc_channel_stack_init (grpc_exec_ctx * exec_ctx, const grpc_channel_filter ** filters, size_t filter_count, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, grpc_channel_stack * stack);
+void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
+ const grpc_channel_filter **filters,
+ size_t filter_count, grpc_channel *master,
+ const grpc_channel_args *args,
+ grpc_mdctx *metadata_context,
+ grpc_channel_stack *stack);
/* Destroy a channel stack */
-void grpc_channel_stack_destroy (grpc_exec_ctx * exec_ctx, grpc_channel_stack * stack);
+void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *stack);
/* Initialize a call stack given a channel stack. transport_server_data is
expected to be NULL on a client, or an opaque transport owned pointer on the
server. */
-void grpc_call_stack_init (grpc_exec_ctx * exec_ctx, grpc_channel_stack * channel_stack, const void *transport_server_data, grpc_transport_stream_op * initial_op, grpc_call_stack * call_stack);
+void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack,
+ const void *transport_server_data,
+ grpc_transport_stream_op *initial_op,
+ grpc_call_stack *call_stack);
/* Destroy a call stack */
-void grpc_call_stack_destroy (grpc_exec_ctx * exec_ctx, grpc_call_stack * stack);
+void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack);
/* Call the next operation in a call stack */
-void grpc_call_next_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op);
+void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
-void grpc_channel_next_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op);
+void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_transport_op *op);
/* Pass through a request to get_peer to the next child element */
-char *grpc_call_next_get_peer (grpc_exec_ctx * exec_ctx, grpc_call_element * elem);
+char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
/* Given the top element of a channel stack, get the channel stack itself */
-grpc_channel_stack *grpc_channel_stack_from_top_element (grpc_channel_element * elem);
+grpc_channel_stack *grpc_channel_stack_from_top_element(
+ grpc_channel_element *elem);
/* Given the top element of a call stack, get the call stack itself */
-grpc_call_stack *grpc_call_stack_from_top_element (grpc_call_element * elem);
+grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
-void grpc_call_log_op (char *file, int line, gpr_log_severity severity, grpc_call_element * elem, grpc_transport_stream_op * op);
+void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
+ grpc_call_element *elem, grpc_transport_stream_op *op);
-void grpc_call_element_send_cancel (grpc_exec_ctx * exec_ctx, grpc_call_element * cur_elem);
+void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *cur_elem);
extern int grpc_trace_channel;
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 97f097d301..ce03ad5eb0 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -51,8 +51,7 @@
typedef struct call_data call_data;
-typedef struct
-{
+typedef struct {
/** metadata context for this channel */
grpc_mdctx *mdctx;
/** resolver for this channel */
@@ -90,16 +89,14 @@ typedef struct
to watch for state changes from the lb_policy. When a state change is seen,
we
update the channel, and create a new watcher */
-typedef struct
-{
+typedef struct {
channel_data *chand;
grpc_closure on_changed;
grpc_connectivity_state state;
grpc_lb_policy *lb_policy;
} lb_policy_connectivity_watcher;
-typedef enum
-{
+typedef enum {
CALL_CREATED,
CALL_WAITING_FOR_SEND,
CALL_WAITING_FOR_CONFIG,
@@ -109,8 +106,7 @@ typedef enum
CALL_CANCELLED
} call_state;
-struct call_data
-{
+struct call_data {
/* owning element */
grpc_call_element *elem;
@@ -127,406 +123,361 @@ struct call_data
grpc_linked_mdelem details;
};
-static grpc_closure *
-merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_op)
- GRPC_MUST_USE_RESULT;
+static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
+ grpc_transport_stream_op *new_op)
+ GRPC_MUST_USE_RESULT;
- static void handle_op_after_cancellation (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op)
-{
+static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (op->send_ops)
- {
- grpc_stream_ops_unref_owned_objects (op->send_ops->ops, op->send_ops->nops);
- op->on_done_send->cb (exec_ctx, op->on_done_send->cb_arg, 0);
- }
- if (op->recv_ops)
- {
- char status[GPR_LTOA_MIN_BUFSIZE];
- grpc_metadata_batch mdb;
- gpr_ltoa (GRPC_STATUS_CANCELLED, status);
- calld->status.md = grpc_mdelem_from_strings (chand->mdctx, "grpc-status", status);
- calld->details.md = grpc_mdelem_from_strings (chand->mdctx, "grpc-message", "Cancelled");
- calld->status.prev = calld->details.next = NULL;
- calld->status.next = &calld->details;
- calld->details.prev = &calld->status;
- mdb.list.head = &calld->status;
- mdb.list.tail = &calld->details;
- mdb.garbage.head = mdb.garbage.tail = NULL;
- mdb.deadline = gpr_inf_future (GPR_CLOCK_REALTIME);
- grpc_sopb_add_metadata (op->recv_ops, mdb);
- *op->recv_state = GRPC_STREAM_CLOSED;
- op->on_done_recv->cb (exec_ctx, op->on_done_recv->cb_arg, 1);
- }
- if (op->on_consumed)
- {
- op->on_consumed->cb (exec_ctx, op->on_consumed->cb_arg, 0);
- }
+ if (op->send_ops) {
+ grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
+ op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
+ }
+ if (op->recv_ops) {
+ char status[GPR_LTOA_MIN_BUFSIZE];
+ grpc_metadata_batch mdb;
+ gpr_ltoa(GRPC_STATUS_CANCELLED, status);
+ calld->status.md =
+ grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
+ calld->details.md =
+ grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
+ calld->status.prev = calld->details.next = NULL;
+ calld->status.next = &calld->details;
+ calld->details.prev = &calld->status;
+ mdb.list.head = &calld->status;
+ mdb.list.tail = &calld->details;
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ grpc_sopb_add_metadata(op->recv_ops, mdb);
+ *op->recv_state = GRPC_STREAM_CLOSED;
+ op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
+ }
+ if (op->on_consumed) {
+ op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
+ }
}
-typedef struct
-{
+typedef struct {
grpc_closure closure;
grpc_call_element *elem;
} waiting_call;
-static void perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op, int continuation);
+static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op,
+ int continuation);
-static void
-continue_with_pick (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
-{
+static void continue_with_pick(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
waiting_call *wc = arg;
call_data *calld = wc->elem->call_data;
- perform_transport_stream_op (exec_ctx, wc->elem, &calld->waiting_op, 1);
- gpr_free (wc);
+ perform_transport_stream_op(exec_ctx, wc->elem, &calld->waiting_op, 1);
+ gpr_free(wc);
}
-static void
-add_to_lb_policy_wait_queue_locked_state_config (grpc_call_element * elem)
-{
+static void add_to_lb_policy_wait_queue_locked_state_config(
+ grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
- waiting_call *wc = gpr_malloc (sizeof (*wc));
- grpc_closure_init (&wc->closure, continue_with_pick, wc);
+ waiting_call *wc = gpr_malloc(sizeof(*wc));
+ grpc_closure_init(&wc->closure, continue_with_pick, wc);
wc->elem = elem;
- grpc_closure_list_add (&chand->waiting_for_config_closures, &wc->closure, 1);
+ grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1);
}
-static int
-is_empty (void *p, int len)
-{
+static int is_empty(void *p, int len) {
char *ptr = p;
int i;
- for (i = 0; i < len; i++)
- {
- if (ptr[i] != 0)
- return 0;
- }
+ for (i = 0; i < len; i++) {
+ if (ptr[i] != 0) return 0;
+ }
return 1;
}
-static void
-started_call (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
-{
+static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
call_data *calld = arg;
grpc_transport_stream_op op;
int have_waiting;
- gpr_mu_lock (&calld->mu_state);
- if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL)
- {
- memset (&op, 0, sizeof (op));
- op.cancel_with_status = GRPC_STATUS_CANCELLED;
- gpr_mu_unlock (&calld->mu_state);
- grpc_subchannel_call_process_op (exec_ctx, calld->subchannel_call, &op);
- }
- else if (calld->state == CALL_WAITING_FOR_CALL)
- {
- have_waiting = !is_empty (&calld->waiting_op, sizeof (calld->waiting_op));
- if (calld->subchannel_call != NULL)
- {
- calld->state = CALL_ACTIVE;
- gpr_mu_unlock (&calld->mu_state);
- if (have_waiting)
- {
- grpc_subchannel_call_process_op (exec_ctx, calld->subchannel_call, &calld->waiting_op);
- }
- }
- else
- {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock (&calld->mu_state);
- if (have_waiting)
- {
- handle_op_after_cancellation (exec_ctx, calld->elem, &calld->waiting_op);
- }
- }
- }
- else
- {
- GPR_ASSERT (calld->state == CALL_CANCELLED);
- gpr_mu_unlock (&calld->mu_state);
- }
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op);
+ } else if (calld->state == CALL_WAITING_FOR_CALL) {
+ have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
+ if (calld->subchannel_call != NULL) {
+ calld->state = CALL_ACTIVE;
+ gpr_mu_unlock(&calld->mu_state);
+ if (have_waiting) {
+ grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
+ &calld->waiting_op);
+ }
+ } else {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ if (have_waiting) {
+ handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
+ }
+ }
+ } else {
+ GPR_ASSERT(calld->state == CALL_CANCELLED);
+ gpr_mu_unlock(&calld->mu_state);
+ }
}
-static void
-picked_target (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
-{
+static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
call_data *calld = arg;
grpc_pollset *pollset;
- if (calld->picked_channel == NULL)
- {
- /* treat this like a cancellation */
- calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
- perform_transport_stream_op (exec_ctx, calld->elem, &calld->waiting_op, 1);
- }
- else
- {
- gpr_mu_lock (&calld->mu_state);
- if (calld->state == CALL_CANCELLED)
- {
- gpr_mu_unlock (&calld->mu_state);
- handle_op_after_cancellation (exec_ctx, calld->elem, &calld->waiting_op);
- }
- else
- {
- GPR_ASSERT (calld->state == CALL_WAITING_FOR_PICK);
- calld->state = CALL_WAITING_FOR_CALL;
- pollset = calld->waiting_op.bind_pollset;
- gpr_mu_unlock (&calld->mu_state);
- grpc_closure_init (&calld->async_setup_task, started_call, calld);
- grpc_subchannel_create_call (exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call, &calld->async_setup_task);
- }
- }
+ if (calld->picked_channel == NULL) {
+ /* treat this like a cancellation */
+ calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
+ perform_transport_stream_op(exec_ctx, calld->elem, &calld->waiting_op, 1);
+ } else {
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == CALL_CANCELLED) {
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
+ } else {
+ GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
+ calld->state = CALL_WAITING_FOR_CALL;
+ pollset = calld->waiting_op.bind_pollset;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_closure_init(&calld->async_setup_task, started_call, calld);
+ grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset,
+ &calld->subchannel_call,
+ &calld->async_setup_task);
+ }
+ }
}
-static grpc_closure *
-merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_op)
-{
+static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
+ grpc_transport_stream_op *new_op) {
call_data *calld = elem->call_data;
grpc_closure *consumed_op = NULL;
grpc_transport_stream_op *waiting_op = &calld->waiting_op;
- GPR_ASSERT ((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
- GPR_ASSERT ((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
- if (new_op->send_ops != NULL)
- {
- waiting_op->send_ops = new_op->send_ops;
- waiting_op->is_last_send = new_op->is_last_send;
- waiting_op->on_done_send = new_op->on_done_send;
- }
- if (new_op->recv_ops != NULL)
- {
- waiting_op->recv_ops = new_op->recv_ops;
- waiting_op->recv_state = new_op->recv_state;
- waiting_op->on_done_recv = new_op->on_done_recv;
- }
- if (new_op->on_consumed != NULL)
- {
- if (waiting_op->on_consumed != NULL)
- {
- consumed_op = waiting_op->on_consumed;
- }
- waiting_op->on_consumed = new_op->on_consumed;
- }
- if (new_op->cancel_with_status != GRPC_STATUS_OK)
- {
- waiting_op->cancel_with_status = new_op->cancel_with_status;
- }
+ GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
+ GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
+ if (new_op->send_ops != NULL) {
+ waiting_op->send_ops = new_op->send_ops;
+ waiting_op->is_last_send = new_op->is_last_send;
+ waiting_op->on_done_send = new_op->on_done_send;
+ }
+ if (new_op->recv_ops != NULL) {
+ waiting_op->recv_ops = new_op->recv_ops;
+ waiting_op->recv_state = new_op->recv_state;
+ waiting_op->on_done_recv = new_op->on_done_recv;
+ }
+ if (new_op->on_consumed != NULL) {
+ if (waiting_op->on_consumed != NULL) {
+ consumed_op = waiting_op->on_consumed;
+ }
+ waiting_op->on_consumed = new_op->on_consumed;
+ }
+ if (new_op->cancel_with_status != GRPC_STATUS_OK) {
+ waiting_op->cancel_with_status = new_op->cancel_with_status;
+ }
return consumed_op;
}
-static char *
-cc_get_peer (grpc_exec_ctx * exec_ctx, grpc_call_element * elem)
-{
+static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
char *result;
- gpr_mu_lock (&calld->mu_state);
- if (calld->state == CALL_ACTIVE)
- {
- subchannel_call = calld->subchannel_call;
- GRPC_SUBCHANNEL_CALL_REF (subchannel_call, "get_peer");
- gpr_mu_unlock (&calld->mu_state);
- result = grpc_subchannel_call_get_peer (exec_ctx, subchannel_call);
- GRPC_SUBCHANNEL_CALL_UNREF (exec_ctx, subchannel_call, "get_peer");
- return result;
- }
- else
- {
- gpr_mu_unlock (&calld->mu_state);
- return grpc_channel_get_target (chand->master);
- }
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == CALL_ACTIVE) {
+ subchannel_call = calld->subchannel_call;
+ GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
+ gpr_mu_unlock(&calld->mu_state);
+ result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer");
+ return result;
+ } else {
+ gpr_mu_unlock(&calld->mu_state);
+ return grpc_channel_get_target(chand->master);
+ }
}
-static void
-perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op, int continuation)
-{
+static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op,
+ int continuation) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
grpc_transport_stream_op op2;
- GPR_ASSERT (elem->filter == &grpc_client_channel_filter);
- GRPC_CALL_LOG_OP (GPR_INFO, elem, op);
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- gpr_mu_lock (&calld->mu_state);
- switch (calld->state)
- {
+ gpr_mu_lock(&calld->mu_state);
+ switch (calld->state) {
case CALL_ACTIVE:
- GPR_ASSERT (!continuation);
+ GPR_ASSERT(!continuation);
subchannel_call = calld->subchannel_call;
- gpr_mu_unlock (&calld->mu_state);
- grpc_subchannel_call_process_op (exec_ctx, subchannel_call, op);
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op);
break;
case CALL_CANCELLED:
- gpr_mu_unlock (&calld->mu_state);
- handle_op_after_cancellation (exec_ctx, elem, op);
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(exec_ctx, elem, op);
break;
case CALL_WAITING_FOR_SEND:
- GPR_ASSERT (!continuation);
- grpc_exec_ctx_enqueue (exec_ctx, merge_into_waiting_op (elem, op), 1);
- if (!calld->waiting_op.send_ops && calld->waiting_op.cancel_with_status == GRPC_STATUS_OK)
- {
- gpr_mu_unlock (&calld->mu_state);
- break;
- }
+ GPR_ASSERT(!continuation);
+ grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
+ if (!calld->waiting_op.send_ops &&
+ calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
+ gpr_mu_unlock(&calld->mu_state);
+ break;
+ }
*op = calld->waiting_op;
- memset (&calld->waiting_op, 0, sizeof (calld->waiting_op));
+ memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
continuation = 1;
- /* fall through */
+ /* fall through */
case CALL_WAITING_FOR_CONFIG:
case CALL_WAITING_FOR_PICK:
case CALL_WAITING_FOR_CALL:
- if (!continuation)
- {
- if (op->cancel_with_status != GRPC_STATUS_OK)
- {
- calld->state = CALL_CANCELLED;
- op2 = calld->waiting_op;
- memset (&calld->waiting_op, 0, sizeof (calld->waiting_op));
- if (op->on_consumed)
- {
- calld->waiting_op.on_consumed = op->on_consumed;
- op->on_consumed = NULL;
- }
- else if (op2.on_consumed)
- {
- calld->waiting_op.on_consumed = op2.on_consumed;
- op2.on_consumed = NULL;
- }
- gpr_mu_unlock (&calld->mu_state);
- handle_op_after_cancellation (exec_ctx, elem, op);
- handle_op_after_cancellation (exec_ctx, elem, &op2);
- }
- else
- {
- grpc_exec_ctx_enqueue (exec_ctx, merge_into_waiting_op (elem, op), 1);
- gpr_mu_unlock (&calld->mu_state);
- }
- break;
- }
- /* fall through */
+ if (!continuation) {
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ calld->state = CALL_CANCELLED;
+ op2 = calld->waiting_op;
+ memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
+ if (op->on_consumed) {
+ calld->waiting_op.on_consumed = op->on_consumed;
+ op->on_consumed = NULL;
+ } else if (op2.on_consumed) {
+ calld->waiting_op.on_consumed = op2.on_consumed;
+ op2.on_consumed = NULL;
+ }
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(exec_ctx, elem, op);
+ handle_op_after_cancellation(exec_ctx, elem, &op2);
+ } else {
+ grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
+ gpr_mu_unlock(&calld->mu_state);
+ }
+ break;
+ }
+ /* fall through */
case CALL_CREATED:
- if (op->cancel_with_status != GRPC_STATUS_OK)
- {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock (&calld->mu_state);
- handle_op_after_cancellation (exec_ctx, elem, op);
- }
- else
- {
- calld->waiting_op = *op;
-
- if (op->send_ops == NULL)
- {
- /* need to have some send ops before we can select the
- lb target */
- calld->state = CALL_WAITING_FOR_SEND;
- gpr_mu_unlock (&calld->mu_state);
- }
- else
- {
- gpr_mu_lock (&chand->mu_config);
- lb_policy = chand->lb_policy;
- if (lb_policy)
- {
- grpc_transport_stream_op *op = &calld->waiting_op;
- grpc_pollset *bind_pollset = op->bind_pollset;
- grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata;
- GRPC_LB_POLICY_REF (lb_policy, "pick");
- gpr_mu_unlock (&chand->mu_config);
- calld->state = CALL_WAITING_FOR_PICK;
-
- GPR_ASSERT (op->bind_pollset);
- GPR_ASSERT (op->send_ops);
- GPR_ASSERT (op->send_ops->nops >= 1);
- GPR_ASSERT (op->send_ops->ops[0].type == GRPC_OP_METADATA);
- gpr_mu_unlock (&calld->mu_state);
-
- grpc_closure_init (&calld->async_setup_task, picked_target, calld);
- grpc_lb_policy_pick (exec_ctx, lb_policy, bind_pollset, initial_metadata, &calld->picked_channel, &calld->async_setup_task);
-
- GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "pick");
- }
- else if (chand->resolver != NULL)
- {
- calld->state = CALL_WAITING_FOR_CONFIG;
- add_to_lb_policy_wait_queue_locked_state_config (elem);
- if (!chand->started_resolving && chand->resolver != NULL)
- {
- GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver");
- chand->started_resolving = 1;
- grpc_resolver_next (exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed);
- }
- gpr_mu_unlock (&chand->mu_config);
- gpr_mu_unlock (&calld->mu_state);
- }
- else
- {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock (&chand->mu_config);
- gpr_mu_unlock (&calld->mu_state);
- handle_op_after_cancellation (exec_ctx, elem, op);
- }
- }
- }
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(exec_ctx, elem, op);
+ } else {
+ calld->waiting_op = *op;
+
+ if (op->send_ops == NULL) {
+ /* need to have some send ops before we can select the
+ lb target */
+ calld->state = CALL_WAITING_FOR_SEND;
+ gpr_mu_unlock(&calld->mu_state);
+ } else {
+ gpr_mu_lock(&chand->mu_config);
+ lb_policy = chand->lb_policy;
+ if (lb_policy) {
+ grpc_transport_stream_op *op = &calld->waiting_op;
+ grpc_pollset *bind_pollset = op->bind_pollset;
+ grpc_metadata_batch *initial_metadata =
+ &op->send_ops->ops[0].data.metadata;
+ GRPC_LB_POLICY_REF(lb_policy, "pick");
+ gpr_mu_unlock(&chand->mu_config);
+ calld->state = CALL_WAITING_FOR_PICK;
+
+ GPR_ASSERT(op->bind_pollset);
+ GPR_ASSERT(op->send_ops);
+ GPR_ASSERT(op->send_ops->nops >= 1);
+ GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
+ gpr_mu_unlock(&calld->mu_state);
+
+ grpc_closure_init(&calld->async_setup_task, picked_target, calld);
+ grpc_lb_policy_pick(exec_ctx, lb_policy, bind_pollset,
+ initial_metadata, &calld->picked_channel,
+ &calld->async_setup_task);
+
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick");
+ } else if (chand->resolver != NULL) {
+ calld->state = CALL_WAITING_FOR_CONFIG;
+ add_to_lb_policy_wait_queue_locked_state_config(elem);
+ if (!chand->started_resolving && chand->resolver != NULL) {
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ chand->started_resolving = 1;
+ grpc_resolver_next(exec_ctx, chand->resolver,
+ &chand->incoming_configuration,
+ &chand->on_config_changed);
+ }
+ gpr_mu_unlock(&chand->mu_config);
+ gpr_mu_unlock(&calld->mu_state);
+ } else {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu_config);
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(exec_ctx, elem, op);
+ }
+ }
+ }
break;
- }
+ }
}
-static void
-cc_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op)
-{
- perform_transport_stream_op (exec_ctx, elem, op, 0);
+static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ perform_transport_stream_op(exec_ctx, elem, op, 0);
}
-static void watch_lb_policy (grpc_exec_ctx * exec_ctx, channel_data * chand, grpc_lb_policy * lb_policy, grpc_connectivity_state current_state);
+static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
+ grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state);
-static void
-on_lb_policy_state_changed_locked (grpc_exec_ctx * exec_ctx, lb_policy_connectivity_watcher * w)
-{
+static void on_lb_policy_state_changed_locked(
+ grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
/* check if the notification is for a stale policy */
- if (w->lb_policy != w->chand->lb_policy)
- return;
+ if (w->lb_policy != w->chand->lb_policy) return;
- grpc_connectivity_state_set (exec_ctx, &w->chand->state_tracker, w->state, "lb_changed");
- if (w->state != GRPC_CHANNEL_FATAL_FAILURE)
- {
- watch_lb_policy (exec_ctx, w->chand, w->lb_policy, w->state);
- }
+ grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state,
+ "lb_changed");
+ if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
+ watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
+ }
}
-static void
-on_lb_policy_state_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
-{
+static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
- gpr_mu_lock (&w->chand->mu_config);
- on_lb_policy_state_changed_locked (exec_ctx, w);
- gpr_mu_unlock (&w->chand->mu_config);
+ gpr_mu_lock(&w->chand->mu_config);
+ on_lb_policy_state_changed_locked(exec_ctx, w);
+ gpr_mu_unlock(&w->chand->mu_config);
- GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, w->chand->master, "watch_lb_policy");
- gpr_free (w);
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy");
+ gpr_free(w);
}
-static void
-watch_lb_policy (grpc_exec_ctx * exec_ctx, channel_data * chand, grpc_lb_policy * lb_policy, grpc_connectivity_state current_state)
-{
- lb_policy_connectivity_watcher *w = gpr_malloc (sizeof (*w));
- GRPC_CHANNEL_INTERNAL_REF (chand->master, "watch_lb_policy");
+static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
+ grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state) {
+ lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
w->chand = chand;
- grpc_closure_init (&w->on_changed, on_lb_policy_state_changed, w);
+ grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
w->state = current_state;
w->lb_policy = lb_policy;
- grpc_lb_policy_notify_on_state_change (exec_ctx, lb_policy, &w->state, &w->on_changed);
+ grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
+ &w->on_changed);
}
-static void
-cc_on_config_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
-{
+static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
@@ -534,313 +485,295 @@ cc_on_config_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
int exit_idle = 0;
- if (chand->incoming_configuration != NULL)
- {
- lb_policy = grpc_client_config_get_lb_policy (chand->incoming_configuration);
- if (lb_policy != NULL)
- {
- GRPC_LB_POLICY_REF (lb_policy, "channel");
- GRPC_LB_POLICY_REF (lb_policy, "config_change");
- state = grpc_lb_policy_check_connectivity (exec_ctx, lb_policy);
- }
-
- grpc_client_config_unref (exec_ctx, chand->incoming_configuration);
+ if (chand->incoming_configuration != NULL) {
+ lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
+ if (lb_policy != NULL) {
+ GRPC_LB_POLICY_REF(lb_policy, "channel");
+ GRPC_LB_POLICY_REF(lb_policy, "config_change");
+ state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy);
}
+ grpc_client_config_unref(exec_ctx, chand->incoming_configuration);
+ }
+
chand->incoming_configuration = NULL;
- gpr_mu_lock (&chand->mu_config);
+ gpr_mu_lock(&chand->mu_config);
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
- if (lb_policy != NULL || chand->resolver == NULL /* disconnected */ )
- {
- grpc_exec_ctx_enqueue_list (exec_ctx, &chand->waiting_for_config_closures);
- }
- if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives)
- {
- GRPC_LB_POLICY_REF (lb_policy, "exit_idle");
- exit_idle = 1;
- chand->exit_idle_when_lb_policy_arrives = 0;
- }
-
- if (iomgr_success && chand->resolver)
- {
- grpc_resolver *resolver = chand->resolver;
- GRPC_RESOLVER_REF (resolver, "channel-next");
- grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, state, "new_lb+resolver");
- if (lb_policy != NULL)
- {
- watch_lb_policy (exec_ctx, chand, lb_policy, state);
- }
- gpr_mu_unlock (&chand->mu_config);
- GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver");
- grpc_resolver_next (exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed);
- GRPC_RESOLVER_UNREF (exec_ctx, resolver, "channel-next");
- }
- else
- {
- old_resolver = chand->resolver;
- chand->resolver = NULL;
- grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
- gpr_mu_unlock (&chand->mu_config);
- if (old_resolver != NULL)
- {
- grpc_resolver_shutdown (exec_ctx, old_resolver);
- GRPC_RESOLVER_UNREF (exec_ctx, old_resolver, "channel");
- }
- }
-
- if (exit_idle)
- {
- grpc_lb_policy_exit_idle (exec_ctx, lb_policy);
- GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "exit_idle");
- }
-
- if (old_lb_policy != NULL)
- {
- grpc_lb_policy_shutdown (exec_ctx, old_lb_policy);
- GRPC_LB_POLICY_UNREF (exec_ctx, old_lb_policy, "channel");
- }
-
- if (lb_policy != NULL)
- {
- GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "config_change");
- }
-
- GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, chand->master, "resolver");
+ if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures);
+ }
+ if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
+ GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
+ exit_idle = 1;
+ chand->exit_idle_when_lb_policy_arrives = 0;
+ }
+
+ if (iomgr_success && chand->resolver) {
+ grpc_resolver *resolver = chand->resolver;
+ GRPC_RESOLVER_REF(resolver, "channel-next");
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
+ "new_lb+resolver");
+ if (lb_policy != NULL) {
+ watch_lb_policy(exec_ctx, chand, lb_policy, state);
+ }
+ gpr_mu_unlock(&chand->mu_config);
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
+ &chand->on_config_changed);
+ GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next");
+ } else {
+ old_resolver = chand->resolver;
+ chand->resolver = NULL;
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
+ gpr_mu_unlock(&chand->mu_config);
+ if (old_resolver != NULL) {
+ grpc_resolver_shutdown(exec_ctx, old_resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, old_resolver, "channel");
+ }
+ }
+
+ if (exit_idle) {
+ grpc_lb_policy_exit_idle(exec_ctx, lb_policy);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
+ }
+
+ if (old_lb_policy != NULL) {
+ grpc_lb_policy_shutdown(exec_ctx, old_lb_policy);
+ GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
+ }
+
+ if (lb_policy != NULL) {
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
+ }
+
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver");
}
-static void
-cc_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op)
-{
+static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_transport_op *op) {
grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
- grpc_exec_ctx_enqueue (exec_ctx, op->on_consumed, 1);
-
- GPR_ASSERT (op->set_accept_stream == NULL);
- GPR_ASSERT (op->bind_pollset == NULL);
-
- gpr_mu_lock (&chand->mu_config);
- if (op->on_connectivity_state_change != NULL)
- {
- grpc_connectivity_state_notify_on_state_change (exec_ctx, &chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change);
- op->on_connectivity_state_change = NULL;
- op->connectivity_state = NULL;
- }
-
- if (!is_empty (op, sizeof (*op)))
- {
- lb_policy = chand->lb_policy;
- if (lb_policy)
- {
- GRPC_LB_POLICY_REF (lb_policy, "broadcast");
- }
- }
-
- if (op->disconnect && chand->resolver != NULL)
- {
- grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
- destroy_resolver = chand->resolver;
- chand->resolver = NULL;
- if (chand->lb_policy != NULL)
- {
- grpc_lb_policy_shutdown (exec_ctx, chand->lb_policy);
- GRPC_LB_POLICY_UNREF (exec_ctx, chand->lb_policy, "channel");
- chand->lb_policy = NULL;
- }
- }
- gpr_mu_unlock (&chand->mu_config);
-
- if (destroy_resolver)
- {
- grpc_resolver_shutdown (exec_ctx, destroy_resolver);
- GRPC_RESOLVER_UNREF (exec_ctx, destroy_resolver, "channel");
- }
-
- if (lb_policy)
- {
- grpc_lb_policy_broadcast (exec_ctx, lb_policy, op);
- GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "broadcast");
- }
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
+
+ GPR_ASSERT(op->set_accept_stream == NULL);
+ GPR_ASSERT(op->bind_pollset == NULL);
+
+ gpr_mu_lock(&chand->mu_config);
+ if (op->on_connectivity_state_change != NULL) {
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &chand->state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
+ op->on_connectivity_state_change = NULL;
+ op->connectivity_state = NULL;
+ }
+
+ if (!is_empty(op, sizeof(*op))) {
+ lb_policy = chand->lb_policy;
+ if (lb_policy) {
+ GRPC_LB_POLICY_REF(lb_policy, "broadcast");
+ }
+ }
+
+ if (op->disconnect && chand->resolver != NULL) {
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+ destroy_resolver = chand->resolver;
+ chand->resolver = NULL;
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy);
+ GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
+ chand->lb_policy = NULL;
+ }
+ }
+ gpr_mu_unlock(&chand->mu_config);
+
+ if (destroy_resolver) {
+ grpc_resolver_shutdown(exec_ctx, destroy_resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel");
+ }
+
+ if (lb_policy) {
+ grpc_lb_policy_broadcast(exec_ctx, lb_policy, op);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast");
+ }
}
/* Constructor for call_data */
-static void
-init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op)
-{
+static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ const void *server_transport_data,
+ grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
/* TODO(ctiller): is there something useful we can do here? */
- GPR_ASSERT (initial_op == NULL);
+ GPR_ASSERT(initial_op == NULL);
- GPR_ASSERT (elem->filter == &grpc_client_channel_filter);
- GPR_ASSERT (server_transport_data == NULL);
- gpr_mu_init (&calld->mu_state);
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ GPR_ASSERT(server_transport_data == NULL);
+ gpr_mu_init(&calld->mu_state);
calld->elem = elem;
calld->state = CALL_CREATED;
- calld->deadline = gpr_inf_future (GPR_CLOCK_REALTIME);
+ calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
/* Destructor for call_data */
-static void
-destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem)
-{
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
call_data *calld = elem->call_data;
grpc_subchannel_call *subchannel_call;
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
- gpr_mu_lock (&calld->mu_state);
- switch (calld->state)
- {
+ gpr_mu_lock(&calld->mu_state);
+ switch (calld->state) {
case CALL_ACTIVE:
subchannel_call = calld->subchannel_call;
- gpr_mu_unlock (&calld->mu_state);
- GRPC_SUBCHANNEL_CALL_UNREF (exec_ctx, subchannel_call, "client_channel");
+ gpr_mu_unlock(&calld->mu_state);
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_channel");
break;
case CALL_CREATED:
case CALL_CANCELLED:
- gpr_mu_unlock (&calld->mu_state);
+ gpr_mu_unlock(&calld->mu_state);
break;
case CALL_WAITING_FOR_PICK:
case CALL_WAITING_FOR_CONFIG:
case CALL_WAITING_FOR_CALL:
case CALL_WAITING_FOR_SEND:
- gpr_log (GPR_ERROR, "should never reach here");
- abort ();
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
break;
- }
+ }
}
/* Constructor for channel_data */
-static void
-init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, int is_first, int is_last)
-{
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem, grpc_channel *master,
+ const grpc_channel_args *args,
+ grpc_mdctx *metadata_context, int is_first,
+ int is_last) {
channel_data *chand = elem->channel_data;
- memset (chand, 0, sizeof (*chand));
+ memset(chand, 0, sizeof(*chand));
- GPR_ASSERT (is_last);
- GPR_ASSERT (elem->filter == &grpc_client_channel_filter);
+ GPR_ASSERT(is_last);
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
- gpr_mu_init (&chand->mu_config);
+ gpr_mu_init(&chand->mu_config);
chand->mdctx = metadata_context;
chand->master = master;
- grpc_pollset_set_init (&chand->pollset_set);
- grpc_closure_init (&chand->on_config_changed, cc_on_config_changed, chand);
+ grpc_pollset_set_init(&chand->pollset_set);
+ grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
- grpc_connectivity_state_init (&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel");
+ grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
+ "client_channel");
}
/* Destructor for channel_data */
-static void
-destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem)
-{
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
- if (chand->resolver != NULL)
- {
- grpc_resolver_shutdown (exec_ctx, chand->resolver);
- GRPC_RESOLVER_UNREF (exec_ctx, chand->resolver, "channel");
- }
- if (chand->lb_policy != NULL)
- {
- GRPC_LB_POLICY_UNREF (exec_ctx, chand->lb_policy, "channel");
- }
- grpc_connectivity_state_destroy (exec_ctx, &chand->state_tracker);
- grpc_pollset_set_destroy (&chand->pollset_set);
- gpr_mu_destroy (&chand->mu_config);
+ if (chand->resolver != NULL) {
+ grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
+ }
+ if (chand->lb_policy != NULL) {
+ GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
+ }
+ grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
+ grpc_pollset_set_destroy(&chand->pollset_set);
+ gpr_mu_destroy(&chand->mu_config);
}
const grpc_channel_filter grpc_client_channel_filter = {
- cc_start_transport_stream_op,
- cc_start_transport_op,
- sizeof (call_data),
- init_call_elem,
- destroy_call_elem,
- sizeof (channel_data),
- init_channel_elem,
- destroy_channel_elem,
- cc_get_peer,
- "client-channel",
+ cc_start_transport_stream_op,
+ cc_start_transport_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ cc_get_peer,
+ "client-channel",
};
-void
-grpc_client_channel_set_resolver (grpc_exec_ctx * exec_ctx, grpc_channel_stack * channel_stack, grpc_resolver * resolver)
-{
+void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver) {
/* post construction initialization: set the transport setup pointer */
- grpc_channel_element *elem = grpc_channel_stack_last_element (channel_stack);
+ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
- gpr_mu_lock (&chand->mu_config);
- GPR_ASSERT (!chand->resolver);
+ gpr_mu_lock(&chand->mu_config);
+ GPR_ASSERT(!chand->resolver);
chand->resolver = resolver;
- GRPC_RESOLVER_REF (resolver, "channel");
- if (!grpc_closure_list_empty (chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives)
- {
- chand->started_resolving = 1;
- GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver");
- grpc_resolver_next (exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed);
- }
- gpr_mu_unlock (&chand->mu_config);
+ GRPC_RESOLVER_REF(resolver, "channel");
+ if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
+ chand->exit_idle_when_lb_policy_arrives) {
+ chand->started_resolving = 1;
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
+ &chand->on_config_changed);
+ }
+ gpr_mu_unlock(&chand->mu_config);
}
-grpc_connectivity_state
-grpc_client_channel_check_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, int try_to_connect)
-{
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
- gpr_mu_lock (&chand->mu_config);
- out = grpc_connectivity_state_check (&chand->state_tracker);
- if (out == GRPC_CHANNEL_IDLE && try_to_connect)
- {
- if (chand->lb_policy != NULL)
- {
- grpc_lb_policy_exit_idle (exec_ctx, chand->lb_policy);
- }
- else
- {
- chand->exit_idle_when_lb_policy_arrives = 1;
- if (!chand->started_resolving && chand->resolver != NULL)
- {
- GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver");
- chand->started_resolving = 1;
- grpc_resolver_next (exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed);
- }
- }
- }
- gpr_mu_unlock (&chand->mu_config);
+ gpr_mu_lock(&chand->mu_config);
+ out = grpc_connectivity_state_check(&chand->state_tracker);
+ if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
+ } else {
+ chand->exit_idle_when_lb_policy_arrives = 1;
+ if (!chand->started_resolving && chand->resolver != NULL) {
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ chand->started_resolving = 1;
+ grpc_resolver_next(exec_ctx, chand->resolver,
+ &chand->incoming_configuration,
+ &chand->on_config_changed);
+ }
+ }
+ }
+ gpr_mu_unlock(&chand->mu_config);
return out;
}
-void
-grpc_client_channel_watch_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_connectivity_state * state, grpc_closure * on_complete)
-{
+void grpc_client_channel_watch_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_connectivity_state *state, grpc_closure *on_complete) {
channel_data *chand = elem->channel_data;
- gpr_mu_lock (&chand->mu_config);
- grpc_connectivity_state_notify_on_state_change (exec_ctx, &chand->state_tracker, state, on_complete);
- gpr_mu_unlock (&chand->mu_config);
+ gpr_mu_lock(&chand->mu_config);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &chand->state_tracker, state, on_complete);
+ gpr_mu_unlock(&chand->mu_config);
}
-grpc_pollset_set *
-grpc_client_channel_get_connecting_pollset_set (grpc_channel_element * elem)
-{
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
+ grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
return &chand->pollset_set;
}
-void
-grpc_client_channel_add_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_pollset * pollset)
-{
+void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_pollset *pollset) {
channel_data *chand = elem->channel_data;
- grpc_pollset_set_add_pollset (exec_ctx, &chand->pollset_set, pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset);
}
-void
-grpc_client_channel_del_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_pollset * pollset)
-{
+void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_pollset *pollset) {
channel_data *chand = elem->channel_data;
- grpc_pollset_set_del_pollset (exec_ctx, &chand->pollset_set, pollset);
+ grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset);
}
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index 5af9794727..5103f07a43 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -49,15 +49,25 @@ extern const grpc_channel_filter grpc_client_channel_filter;
/* post-construction initializer to let the client channel know which
transport setup it should cancel upon destruction, or initiate when it needs
a connection */
-void grpc_client_channel_set_resolver (grpc_exec_ctx * exec_ctx, grpc_channel_stack * channel_stack, grpc_resolver * resolver);
+void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver);
-grpc_connectivity_state grpc_client_channel_check_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, int try_to_connect);
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
-void grpc_client_channel_watch_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_connectivity_state * state, grpc_closure * on_complete);
+void grpc_client_channel_watch_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_connectivity_state *state, grpc_closure *on_complete);
-grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set (grpc_channel_element * elem);
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
+ grpc_channel_element *elem);
-void grpc_client_channel_add_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * channel, grpc_pollset * pollset);
-void grpc_client_channel_del_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * channel, grpc_pollset * pollset);
+void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *channel,
+ grpc_pollset *pollset);
+void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *channel,
+ grpc_pollset *pollset);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 6acdc72075..f8dbe8c817 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -44,14 +44,13 @@
#include "src/core/compression/message_compress.h"
#include "src/core/support/string.h"
-typedef struct call_data
-{
+typedef struct call_data {
gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage;
- gpr_uint32 remaining_slice_bytes;
- /**< Input data to be read, as per BEGIN_MESSAGE */
- int written_initial_metadata; /**< Already processed initial md? */
+ gpr_uint32 remaining_slice_bytes;
+ /**< Input data to be read, as per BEGIN_MESSAGE */
+ int written_initial_metadata; /**< Already processed initial md? */
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
grpc_compression_algorithm compression_algorithm;
@@ -59,8 +58,7 @@ typedef struct call_data
int has_compression_algorithm;
} call_data;
-typedef struct channel_data
-{
+typedef struct channel_data {
/** Metadata key for the incoming (requested) compression algorithm */
grpc_mdstr *mdstr_request_compression_algorithm_key;
/** Metadata key for the outgoing (used) compression algorithm */
@@ -82,62 +80,59 @@ typedef struct channel_data
* larger than the raw input).
*
* Returns 1 if the data was actually compress and 0 otherwise. */
-static int
-compress_send_sb (grpc_compression_algorithm algorithm, gpr_slice_buffer * slices)
-{
+static int compress_send_sb(grpc_compression_algorithm algorithm,
+ gpr_slice_buffer *slices) {
int did_compress;
gpr_slice_buffer tmp;
- gpr_slice_buffer_init (&tmp);
- did_compress = grpc_msg_compress (algorithm, slices, &tmp);
- if (did_compress)
- {
- gpr_slice_buffer_swap (slices, &tmp);
- }
- gpr_slice_buffer_destroy (&tmp);
+ gpr_slice_buffer_init(&tmp);
+ did_compress = grpc_msg_compress(algorithm, slices, &tmp);
+ if (did_compress) {
+ gpr_slice_buffer_swap(slices, &tmp);
+ }
+ gpr_slice_buffer_destroy(&tmp);
return did_compress;
}
/** For each \a md element from the incoming metadata, filter out the entry for
* "grpc-encoding", using its value to populate the call data's
* compression_algorithm field. */
-static grpc_mdelem *
-compression_md_filter (void *user_data, grpc_mdelem * md)
-{
+static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
- if (md->key == channeld->mdstr_request_compression_algorithm_key)
- {
- const char *md_c_str = grpc_mdstr_as_c_string (md->value);
- if (!grpc_compression_algorithm_parse (md_c_str, strlen (md_c_str), &calld->compression_algorithm))
- {
- gpr_log (GPR_ERROR, "Invalid compression algorithm: '%s' (unknown). Ignoring.", md_c_str);
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- }
- if (grpc_compression_options_is_algorithm_enabled (&channeld->compression_options, calld->compression_algorithm) == 0)
- {
- gpr_log (GPR_ERROR, "Invalid compression algorithm: '%s' (previously disabled). " "Ignoring.", md_c_str);
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- }
- calld->has_compression_algorithm = 1;
- return NULL;
+ if (md->key == channeld->mdstr_request_compression_algorithm_key) {
+ const char *md_c_str = grpc_mdstr_as_c_string(md->value);
+ if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
+ &calld->compression_algorithm)) {
+ gpr_log(GPR_ERROR,
+ "Invalid compression algorithm: '%s' (unknown). Ignoring.",
+ md_c_str);
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ if (grpc_compression_options_is_algorithm_enabled(
+ &channeld->compression_options, calld->compression_algorithm) ==
+ 0) {
+ gpr_log(GPR_ERROR,
+ "Invalid compression algorithm: '%s' (previously disabled). "
+ "Ignoring.",
+ md_c_str);
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
}
+ calld->has_compression_algorithm = 1;
+ return NULL;
+ }
return md;
}
-static int
-skip_compression (channel_data * channeld, call_data * calld)
-{
- if (calld->has_compression_algorithm)
- {
- if (calld->compression_algorithm == GRPC_COMPRESS_NONE)
- {
- return 1;
- }
- return 0; /* we have an actual call-specific algorithm */
+static int skip_compression(channel_data *channeld, call_data *calld) {
+ if (calld->has_compression_algorithm) {
+ if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
+ return 1;
}
+ return 0; /* we have an actual call-specific algorithm */
+ }
/* no per-call compression override */
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
}
@@ -146,127 +141,126 @@ skip_compression (channel_data * channeld, call_data * calld)
* the associated GRPC_OP_BEGIN_MESSAGE accordingly (new compressed length,
* flags indicating compression is in effect) and replaces \a send_ops with it.
* */
-static void
-finish_compressed_sopb (grpc_stream_op_buffer * send_ops, grpc_call_element * elem)
-{
+static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
+ grpc_call_element *elem) {
size_t i;
call_data *calld = elem->call_data;
- int new_slices_added = 0; /* GPR_FALSE */
+ int new_slices_added = 0; /* GPR_FALSE */
grpc_metadata_batch metadata;
grpc_stream_op_buffer new_send_ops;
- grpc_sopb_init (&new_send_ops);
-
- for (i = 0; i < send_ops->nops; i++)
- {
- grpc_stream_op *sop = &send_ops->ops[i];
- switch (sop->type)
- {
- case GRPC_OP_BEGIN_MESSAGE:
- GPR_ASSERT (calld->slices.length <= GPR_UINT32_MAX);
- grpc_sopb_add_begin_message (&new_send_ops, (gpr_uint32) calld->slices.length, sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS);
- break;
- case GRPC_OP_SLICE:
- /* Once we reach the slices section of the original buffer, simply add
- * all the new (compressed) slices. We obviously want to do this only
- * once, hence the "new_slices_added" guard. */
- if (!new_slices_added)
- {
- size_t j;
- for (j = 0; j < calld->slices.count; ++j)
- {
- grpc_sopb_add_slice (&new_send_ops, gpr_slice_ref (calld->slices.slices[j]));
- }
- new_slices_added = 1; /* GPR_TRUE */
- }
- break;
- case GRPC_OP_METADATA:
- /* move the metadata to the new buffer. */
- grpc_metadata_batch_move (&metadata, &sop->data.metadata);
- grpc_sopb_add_metadata (&new_send_ops, metadata);
- break;
- case GRPC_NO_OP:
- break;
- }
+ grpc_sopb_init(&new_send_ops);
+
+ for (i = 0; i < send_ops->nops; i++) {
+ grpc_stream_op *sop = &send_ops->ops[i];
+ switch (sop->type) {
+ case GRPC_OP_BEGIN_MESSAGE:
+ GPR_ASSERT(calld->slices.length <= GPR_UINT32_MAX);
+ grpc_sopb_add_begin_message(
+ &new_send_ops, (gpr_uint32)calld->slices.length,
+ sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS);
+ break;
+ case GRPC_OP_SLICE:
+ /* Once we reach the slices section of the original buffer, simply add
+ * all the new (compressed) slices. We obviously want to do this only
+ * once, hence the "new_slices_added" guard. */
+ if (!new_slices_added) {
+ size_t j;
+ for (j = 0; j < calld->slices.count; ++j) {
+ grpc_sopb_add_slice(&new_send_ops,
+ gpr_slice_ref(calld->slices.slices[j]));
+ }
+ new_slices_added = 1; /* GPR_TRUE */
+ }
+ break;
+ case GRPC_OP_METADATA:
+ /* move the metadata to the new buffer. */
+ grpc_metadata_batch_move(&metadata, &sop->data.metadata);
+ grpc_sopb_add_metadata(&new_send_ops, metadata);
+ break;
+ case GRPC_NO_OP:
+ break;
}
- grpc_sopb_swap (send_ops, &new_send_ops);
- grpc_sopb_destroy (&new_send_ops);
+ }
+ grpc_sopb_swap(send_ops, &new_send_ops);
+ grpc_sopb_destroy(&new_send_ops);
}
/** Filter's "main" function, called for any incoming grpc_transport_stream_op
* instance that holds a non-zero number of send operations, accesible to this
* function in \a send_ops. */
-static void
-process_send_ops (grpc_call_element * elem, grpc_stream_op_buffer * send_ops)
-{
+static void process_send_ops(grpc_call_element *elem,
+ grpc_stream_op_buffer *send_ops) {
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
size_t i;
int did_compress = 0;
/* In streaming calls, we need to reset the previously accumulated slices */
- gpr_slice_buffer_reset_and_unref (&calld->slices);
- for (i = 0; i < send_ops->nops; ++i)
- {
- grpc_stream_op *sop = &send_ops->ops[i];
- switch (sop->type)
- {
- case GRPC_OP_BEGIN_MESSAGE:
- /* buffer up slices until we've processed all the expected ones (as
- * given by GRPC_OP_BEGIN_MESSAGE) */
- calld->remaining_slice_bytes = sop->data.begin_message.length;
- if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS)
- {
- calld->has_compression_algorithm = 1; /* GPR_TRUE */
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- }
- break;
- case GRPC_OP_METADATA:
- if (!calld->written_initial_metadata)
- {
- /* Parse incoming request for compression. If any, it'll be available
- * at calld->compression_algorithm */
- grpc_metadata_batch_filter (&(sop->data.metadata), compression_md_filter, elem);
- if (!calld->has_compression_algorithm)
- {
- /* If no algorithm was found in the metadata and we aren't
- * exceptionally skipping compression, fall back to the channel
- * default */
- calld->compression_algorithm = channeld->default_compression_algorithm;
- calld->has_compression_algorithm = 1; /* GPR_TRUE */
- }
- /* hint compression algorithm */
- grpc_metadata_batch_add_tail (&(sop->data.metadata), &calld->compression_algorithm_storage, GRPC_MDELEM_REF (channeld->mdelem_compression_algorithms[calld->compression_algorithm]));
-
- /* convey supported compression algorithms */
- grpc_metadata_batch_add_tail (&(sop->data.metadata), &calld->accept_encoding_storage, GRPC_MDELEM_REF (channeld->mdelem_accept_encoding));
-
- calld->written_initial_metadata = 1; /* GPR_TRUE */
- }
- break;
- case GRPC_OP_SLICE:
- if (skip_compression (channeld, calld))
- continue;
- GPR_ASSERT (calld->remaining_slice_bytes > 0);
- /* Increase input ref count, gpr_slice_buffer_add takes ownership. */
- gpr_slice_buffer_add (&calld->slices, gpr_slice_ref (sop->data.slice));
- GPR_ASSERT (GPR_SLICE_LENGTH (sop->data.slice) >= calld->remaining_slice_bytes);
- calld->remaining_slice_bytes -= (gpr_uint32) GPR_SLICE_LENGTH (sop->data.slice);
- if (calld->remaining_slice_bytes == 0)
- {
- did_compress = compress_send_sb (calld->compression_algorithm, &calld->slices);
- }
- break;
- case GRPC_NO_OP:
- break;
- }
+ gpr_slice_buffer_reset_and_unref(&calld->slices);
+ for (i = 0; i < send_ops->nops; ++i) {
+ grpc_stream_op *sop = &send_ops->ops[i];
+ switch (sop->type) {
+ case GRPC_OP_BEGIN_MESSAGE:
+ /* buffer up slices until we've processed all the expected ones (as
+ * given by GRPC_OP_BEGIN_MESSAGE) */
+ calld->remaining_slice_bytes = sop->data.begin_message.length;
+ if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ break;
+ case GRPC_OP_METADATA:
+ if (!calld->written_initial_metadata) {
+ /* Parse incoming request for compression. If any, it'll be available
+ * at calld->compression_algorithm */
+ grpc_metadata_batch_filter(&(sop->data.metadata),
+ compression_md_filter, elem);
+ if (!calld->has_compression_algorithm) {
+ /* If no algorithm was found in the metadata and we aren't
+ * exceptionally skipping compression, fall back to the channel
+ * default */
+ calld->compression_algorithm =
+ channeld->default_compression_algorithm;
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ }
+ /* hint compression algorithm */
+ grpc_metadata_batch_add_tail(
+ &(sop->data.metadata), &calld->compression_algorithm_storage,
+ GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms
+ [calld->compression_algorithm]));
+
+ /* convey supported compression algorithms */
+ grpc_metadata_batch_add_tail(
+ &(sop->data.metadata), &calld->accept_encoding_storage,
+ GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
+
+ calld->written_initial_metadata = 1; /* GPR_TRUE */
+ }
+ break;
+ case GRPC_OP_SLICE:
+ if (skip_compression(channeld, calld)) continue;
+ GPR_ASSERT(calld->remaining_slice_bytes > 0);
+ /* Increase input ref count, gpr_slice_buffer_add takes ownership. */
+ gpr_slice_buffer_add(&calld->slices, gpr_slice_ref(sop->data.slice));
+ GPR_ASSERT(GPR_SLICE_LENGTH(sop->data.slice) >=
+ calld->remaining_slice_bytes);
+ calld->remaining_slice_bytes -=
+ (gpr_uint32)GPR_SLICE_LENGTH(sop->data.slice);
+ if (calld->remaining_slice_bytes == 0) {
+ did_compress =
+ compress_send_sb(calld->compression_algorithm, &calld->slices);
+ }
+ break;
+ case GRPC_NO_OP:
+ break;
}
+ }
/* Modify the send_ops stream_op_buffer depending on whether compression was
* carried out */
- if (did_compress)
- {
- finish_compressed_sopb (send_ops, elem);
- }
+ if (did_compress) {
+ finish_compressed_sopb(send_ops, elem);
+ }
}
/* Called either:
@@ -274,52 +268,49 @@ process_send_ops (grpc_call_element * elem, grpc_stream_op_buffer * send_ops)
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
-static void
-compress_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op)
-{
- if (op->send_ops && op->send_ops->nops > 0)
- {
- process_send_ops (elem, op->send_ops);
- }
+static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ if (op->send_ops && op->send_ops->nops > 0) {
+ process_send_ops(elem, op->send_ops);
+ }
/* pass control down the stack */
- grpc_call_next_op (exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, op);
}
/* Constructor for call_data */
-static void
-init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op)
-{
+static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ const void *server_transport_data,
+ grpc_transport_stream_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
- gpr_slice_buffer_init (&calld->slices);
+ gpr_slice_buffer_init(&calld->slices);
calld->has_compression_algorithm = 0;
- calld->written_initial_metadata = 0; /* GPR_FALSE */
-
- if (initial_op)
- {
- if (initial_op->send_ops && initial_op->send_ops->nops > 0)
- {
- process_send_ops (elem, initial_op->send_ops);
- }
+ calld->written_initial_metadata = 0; /* GPR_FALSE */
+
+ if (initial_op) {
+ if (initial_op->send_ops && initial_op->send_ops->nops > 0) {
+ process_send_ops(elem, initial_op->send_ops);
}
+ }
}
/* Destructor for call_data */
-static void
-destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem)
-{
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
- gpr_slice_buffer_destroy (&calld->slices);
+ gpr_slice_buffer_destroy(&calld->slices);
}
/* Constructor for channel_data */
-static void
-init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last)
-{
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem, grpc_channel *master,
+ const grpc_channel_args *args, grpc_mdctx *mdctx,
+ int is_first, int is_last) {
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1];
@@ -327,72 +318,82 @@ init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_c
char *accept_encoding_str;
size_t accept_encoding_str_len;
- grpc_compression_options_init (&channeld->compression_options);
- channeld->compression_options.enabled_algorithms_bitset = (gpr_uint32) grpc_channel_args_compression_algorithm_get_states (args);
+ grpc_compression_options_init(&channeld->compression_options);
+ channeld->compression_options.enabled_algorithms_bitset =
+ (gpr_uint32)grpc_channel_args_compression_algorithm_get_states(args);
- channeld->default_compression_algorithm = grpc_channel_args_get_compression_algorithm (args);
+ channeld->default_compression_algorithm =
+ grpc_channel_args_get_compression_algorithm(args);
/* Make sure the default isn't disabled. */
- GPR_ASSERT (grpc_compression_options_is_algorithm_enabled (&channeld->compression_options, channeld->default_compression_algorithm));
- channeld->compression_options.default_compression_algorithm = channeld->default_compression_algorithm;
-
- channeld->mdstr_request_compression_algorithm_key = grpc_mdstr_from_string (mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0);
-
- channeld->mdstr_outgoing_compression_algorithm_key = grpc_mdstr_from_string (mdctx, "grpc-encoding", 0);
-
- channeld->mdstr_compression_capabilities_key = grpc_mdstr_from_string (mdctx, "grpc-accept-encoding", 0);
-
- for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx)
- {
- char *algorithm_name;
- /* skip disabled algorithms */
- if (grpc_compression_options_is_algorithm_enabled (&channeld->compression_options, algo_idx) == 0)
- {
- continue;
- }
- GPR_ASSERT (grpc_compression_algorithm_name (algo_idx, &algorithm_name) != 0);
- channeld->mdelem_compression_algorithms[algo_idx] = grpc_mdelem_from_metadata_strings (mdctx, GRPC_MDSTR_REF (channeld->mdstr_outgoing_compression_algorithm_key), grpc_mdstr_from_string (mdctx, algorithm_name, 0));
- if (algo_idx > 0)
- {
- supported_algorithms_names[supported_algorithms_idx++] = algorithm_name;
- }
+ GPR_ASSERT(grpc_compression_options_is_algorithm_enabled(
+ &channeld->compression_options, channeld->default_compression_algorithm));
+ channeld->compression_options.default_compression_algorithm =
+ channeld->default_compression_algorithm;
+
+ channeld->mdstr_request_compression_algorithm_key =
+ grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0);
+
+ channeld->mdstr_outgoing_compression_algorithm_key =
+ grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
+
+ channeld->mdstr_compression_capabilities_key =
+ grpc_mdstr_from_string(mdctx, "grpc-accept-encoding", 0);
+
+ for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
+ char *algorithm_name;
+ /* skip disabled algorithms */
+ if (grpc_compression_options_is_algorithm_enabled(
+ &channeld->compression_options, algo_idx) == 0) {
+ continue;
+ }
+ GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0);
+ channeld->mdelem_compression_algorithms[algo_idx] =
+ grpc_mdelem_from_metadata_strings(
+ mdctx,
+ GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
+ grpc_mdstr_from_string(mdctx, algorithm_name, 0));
+ if (algo_idx > 0) {
+ supported_algorithms_names[supported_algorithms_idx++] = algorithm_name;
}
+ }
/* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated
* arrays, as to avoid the heap allocs */
- accept_encoding_str = gpr_strjoin_sep (supported_algorithms_names, supported_algorithms_idx, ",", &accept_encoding_str_len);
+ accept_encoding_str =
+ gpr_strjoin_sep(supported_algorithms_names, supported_algorithms_idx, ",",
+ &accept_encoding_str_len);
- channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings (mdctx, GRPC_MDSTR_REF (channeld->mdstr_compression_capabilities_key), grpc_mdstr_from_string (mdctx, accept_encoding_str, 0));
- gpr_free (accept_encoding_str);
+ channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings(
+ mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
+ grpc_mdstr_from_string(mdctx, accept_encoding_str, 0));
+ gpr_free(accept_encoding_str);
- GPR_ASSERT (!is_last);
+ GPR_ASSERT(!is_last);
}
/* Destructor for channel data */
-static void
-destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem)
-{
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
- GRPC_MDSTR_UNREF (channeld->mdstr_request_compression_algorithm_key);
- GRPC_MDSTR_UNREF (channeld->mdstr_outgoing_compression_algorithm_key);
- GRPC_MDSTR_UNREF (channeld->mdstr_compression_capabilities_key);
- for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx)
- {
- GRPC_MDELEM_UNREF (channeld->mdelem_compression_algorithms[algo_idx]);
- }
- GRPC_MDELEM_UNREF (channeld->mdelem_accept_encoding);
+ GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key);
+ GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key);
+ GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key);
+ for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
+ GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]);
+ }
+ GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding);
}
const grpc_channel_filter grpc_compress_filter = {
- compress_start_transport_stream_op,
- grpc_channel_next_op,
- sizeof (call_data),
- init_call_elem,
- destroy_call_elem,
- sizeof (channel_data),
- init_channel_elem,
- destroy_channel_elem,
- grpc_call_next_get_peer,
- "compress"
-};
+ compress_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "compress"};
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index bdb75474ff..ea701bc284 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -46,15 +46,11 @@
#define MAX_BUFFER_LENGTH 8192
-typedef struct connected_channel_channel_data
-{
+typedef struct connected_channel_channel_data {
grpc_transport *transport;
} channel_data;
-typedef struct connected_channel_call_data
-{
- void *unused;
-} call_data;
+typedef struct connected_channel_call_data { void *unused; } call_data;
/* We perform a small hack to locate transport data alongside the connected
channel data in call allocations, to allow everything to be pulled in minimal
@@ -65,95 +61,95 @@ typedef struct connected_channel_call_data
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
-static void
-con_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op)
-{
+static void con_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GPR_ASSERT (elem->filter == &grpc_connected_channel_filter);
- GRPC_CALL_LOG_OP (GPR_INFO, elem, op);
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_transport_perform_stream_op (exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld), op);
+ grpc_transport_perform_stream_op(exec_ctx, chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
}
-static void
-con_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op)
-{
+static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_transport_op *op) {
channel_data *chand = elem->channel_data;
- grpc_transport_perform_op (exec_ctx, chand->transport, op);
+ grpc_transport_perform_op(exec_ctx, chand->transport, op);
}
/* Constructor for call_data */
-static void
-init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op)
-{
+static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ const void *server_transport_data,
+ grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r;
- GPR_ASSERT (elem->filter == &grpc_connected_channel_filter);
- r = grpc_transport_init_stream (exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld), server_transport_data, initial_op);
- GPR_ASSERT (r == 0);
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ r = grpc_transport_init_stream(exec_ctx, chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+ server_transport_data, initial_op);
+ GPR_ASSERT(r == 0);
}
/* Destructor for call_data */
-static void
-destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem)
-{
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GPR_ASSERT (elem->filter == &grpc_connected_channel_filter);
- grpc_transport_destroy_stream (exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA (calld));
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ grpc_transport_destroy_stream(exec_ctx, chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld));
}
/* Constructor for channel_data */
-static void
-init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last)
-{
- channel_data *cd = (channel_data *) elem->channel_data;
- GPR_ASSERT (is_last);
- GPR_ASSERT (elem->filter == &grpc_connected_channel_filter);
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem, grpc_channel *master,
+ const grpc_channel_args *args, grpc_mdctx *mdctx,
+ int is_first, int is_last) {
+ channel_data *cd = (channel_data *)elem->channel_data;
+ GPR_ASSERT(is_last);
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
}
/* Destructor for channel_data */
-static void
-destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem)
-{
- channel_data *cd = (channel_data *) elem->channel_data;
- GPR_ASSERT (elem->filter == &grpc_connected_channel_filter);
- grpc_transport_destroy (exec_ctx, cd->transport);
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
+ channel_data *cd = (channel_data *)elem->channel_data;
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ grpc_transport_destroy(exec_ctx, cd->transport);
}
-static char *
-con_get_peer (grpc_exec_ctx * exec_ctx, grpc_call_element * elem)
-{
+static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
- return grpc_transport_get_peer (exec_ctx, chand->transport);
+ return grpc_transport_get_peer(exec_ctx, chand->transport);
}
const grpc_channel_filter grpc_connected_channel_filter = {
- con_start_transport_stream_op,
- con_start_transport_op,
- sizeof (call_data),
- init_call_elem,
- destroy_call_elem,
- sizeof (channel_data),
- init_channel_elem,
- destroy_channel_elem,
- con_get_peer,
- "connected",
+ con_start_transport_stream_op,
+ con_start_transport_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ con_get_peer,
+ "connected",
};
-void
-grpc_connected_channel_bind_transport (grpc_channel_stack * channel_stack, grpc_transport * transport)
-{
+void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
+ grpc_transport *transport) {
/* Assumes that the connected channel filter is always the last filter
in a channel stack */
- grpc_channel_element *elem = grpc_channel_stack_last_element (channel_stack);
- channel_data *cd = (channel_data *) elem->channel_data;
- GPR_ASSERT (elem->filter == &grpc_connected_channel_filter);
- GPR_ASSERT (cd->transport == NULL);
+ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
+ channel_data *cd = (channel_data *)elem->channel_data;
+ GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
+ GPR_ASSERT(cd->transport == NULL);
cd->transport = transport;
/* HACK(ctiller): increase call stack size for the channel to make space
@@ -162,5 +158,5 @@ grpc_connected_channel_bind_transport (grpc_channel_stack * channel_stack, grpc_
This is only "safe" because call stacks place no additional data after
the last call element, and the last call element MUST be the connected
channel. */
- channel_stack->call_stack_size += grpc_transport_stream_size (transport);
+ channel_stack->call_stack_size += grpc_transport_stream_size(transport);
}
diff --git a/src/core/channel/connected_channel.h b/src/core/channel/connected_channel.h
index 67bcd6c911..eac6eb7ebe 100644
--- a/src/core/channel/connected_channel.h
+++ b/src/core/channel/connected_channel.h
@@ -43,6 +43,7 @@ extern const grpc_channel_filter grpc_connected_channel_filter;
/* Post construction fixup: set the transport in the connected channel.
Must be called before any call stack using this filter is used. */
-void grpc_connected_channel_bind_transport (grpc_channel_stack * channel_stack, grpc_transport * transport);
+void grpc_connected_channel_bind_transport(grpc_channel_stack* channel_stack,
+ grpc_transport* transport);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */
diff --git a/src/core/channel/context.h b/src/core/channel/context.h
index 076dc7189b..ac5796b9ef 100644
--- a/src/core/channel/context.h
+++ b/src/core/channel/context.h
@@ -35,17 +35,15 @@
#define GRPC_INTERNAL_CORE_CHANNEL_CONTEXT_H
/* Call object context pointers */
-typedef enum
-{
+typedef enum {
GRPC_CONTEXT_SECURITY = 0,
GRPC_CONTEXT_TRACING,
GRPC_CONTEXT_COUNT
} grpc_context_index;
-typedef struct
-{
+typedef struct {
void *value;
- void (*destroy) (void *);
+ void (*destroy)(void *);
} grpc_call_context_element;
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONTEXT_H */
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 3421bdb7b1..721ed4da39 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -37,8 +37,7 @@
#include <grpc/support/string_util.h>
#include "src/core/support/string.h"
-typedef struct call_data
-{
+typedef struct call_data {
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;
grpc_linked_mdelem authority;
@@ -58,8 +57,7 @@ typedef struct call_data
grpc_closure hc_on_recv;
} call_data;
-typedef struct channel_data
-{
+typedef struct channel_data {
grpc_mdelem *te_trailers;
grpc_mdelem *method;
grpc_mdelem *scheme;
@@ -69,255 +67,222 @@ typedef struct channel_data
grpc_mdelem *user_agent;
} channel_data;
-typedef struct
-{
+typedef struct {
grpc_call_element *elem;
grpc_exec_ctx *exec_ctx;
} client_recv_filter_args;
-static grpc_mdelem *
-client_recv_filter (void *user_data, grpc_mdelem * md)
-{
+static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
client_recv_filter_args *a = user_data;
grpc_call_element *elem = a->elem;
channel_data *channeld = elem->channel_data;
- if (md == channeld->status)
- {
- return NULL;
- }
- else if (md->key == channeld->status->key)
- {
- grpc_call_element_send_cancel (a->exec_ctx, elem);
- return NULL;
- }
- else if (md->key == channeld->content_type->key)
- {
- return NULL;
- }
+ if (md == channeld->status) {
+ return NULL;
+ } else if (md->key == channeld->status->key) {
+ grpc_call_element_send_cancel(a->exec_ctx, elem);
+ return NULL;
+ } else if (md->key == channeld->content_type->key) {
+ return NULL;
+ }
return md;
}
-static void
-hc_on_recv (grpc_exec_ctx * exec_ctx, void *user_data, int success)
-{
+static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
size_t i;
size_t nops = calld->recv_ops->nops;
grpc_stream_op *ops = calld->recv_ops->ops;
- for (i = 0; i < nops; i++)
- {
- grpc_stream_op *op = &ops[i];
- client_recv_filter_args a;
- if (op->type != GRPC_OP_METADATA)
- continue;
- calld->got_initial_metadata = 1;
- a.elem = elem;
- a.exec_ctx = exec_ctx;
- grpc_metadata_batch_filter (&op->data.metadata, client_recv_filter, &a);
- }
- calld->on_done_recv->cb (exec_ctx, calld->on_done_recv->cb_arg, success);
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ client_recv_filter_args a;
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->got_initial_metadata = 1;
+ a.elem = elem;
+ a.exec_ctx = exec_ctx;
+ grpc_metadata_batch_filter(&op->data.metadata, client_recv_filter, &a);
+ }
+ calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
}
-static grpc_mdelem *
-client_strip_filter (void *user_data, grpc_mdelem * md)
-{
+static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *channeld = elem->channel_data;
/* eat the things we'd like to set ourselves */
- if (md->key == channeld->method->key)
- return NULL;
- if (md->key == channeld->scheme->key)
- return NULL;
- if (md->key == channeld->te_trailers->key)
- return NULL;
- if (md->key == channeld->content_type->key)
- return NULL;
- if (md->key == channeld->user_agent->key)
- return NULL;
+ if (md->key == channeld->method->key) return NULL;
+ if (md->key == channeld->scheme->key) return NULL;
+ if (md->key == channeld->te_trailers->key) return NULL;
+ if (md->key == channeld->content_type->key) return NULL;
+ if (md->key == channeld->user_agent->key) return NULL;
return md;
}
-static void
-hc_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op)
-{
+static void hc_mutate_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
size_t i;
- if (op->send_ops && !calld->sent_initial_metadata)
- {
- size_t nops = op->send_ops->nops;
- grpc_stream_op *ops = op->send_ops->ops;
- for (i = 0; i < nops; i++)
- {
- grpc_stream_op *op = &ops[i];
- if (op->type != GRPC_OP_METADATA)
- continue;
- calld->sent_initial_metadata = 1;
- grpc_metadata_batch_filter (&op->data.metadata, client_strip_filter, elem);
- /* Send : prefixed headers, which have to be before any application
- layer headers. */
- grpc_metadata_batch_add_head (&op->data.metadata, &calld->method, GRPC_MDELEM_REF (channeld->method));
- grpc_metadata_batch_add_head (&op->data.metadata, &calld->scheme, GRPC_MDELEM_REF (channeld->scheme));
- grpc_metadata_batch_add_tail (&op->data.metadata, &calld->te_trailers, GRPC_MDELEM_REF (channeld->te_trailers));
- grpc_metadata_batch_add_tail (&op->data.metadata, &calld->content_type, GRPC_MDELEM_REF (channeld->content_type));
- grpc_metadata_batch_add_tail (&op->data.metadata, &calld->user_agent, GRPC_MDELEM_REF (channeld->user_agent));
- break;
- }
+ if (op->send_ops && !calld->sent_initial_metadata) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->sent_initial_metadata = 1;
+ grpc_metadata_batch_filter(&op->data.metadata, client_strip_filter, elem);
+ /* Send : prefixed headers, which have to be before any application
+ layer headers. */
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->method,
+ GRPC_MDELEM_REF(channeld->method));
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme,
+ GRPC_MDELEM_REF(channeld->scheme));
+ grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers,
+ GRPC_MDELEM_REF(channeld->te_trailers));
+ grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
+ GRPC_MDELEM_REF(channeld->content_type));
+ grpc_metadata_batch_add_tail(&op->data.metadata, &calld->user_agent,
+ GRPC_MDELEM_REF(channeld->user_agent));
+ break;
}
+ }
- if (op->recv_ops && !calld->got_initial_metadata)
- {
- /* substitute our callback for the higher callback */
- calld->recv_ops = op->recv_ops;
- calld->on_done_recv = op->on_done_recv;
- op->on_done_recv = &calld->hc_on_recv;
- }
+ if (op->recv_ops && !calld->got_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ op->on_done_recv = &calld->hc_on_recv;
+ }
}
-static void
-hc_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op)
-{
- GRPC_CALL_LOG_OP (GPR_INFO, elem, op);
- hc_mutate_op (elem, op);
- grpc_call_next_op (exec_ctx, elem, op);
+static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ hc_mutate_op(elem, op);
+ grpc_call_next_op(exec_ctx, elem, op);
}
/* Constructor for call_data */
-static void
-init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op)
-{
+static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ const void *server_transport_data,
+ grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
calld->sent_initial_metadata = 0;
calld->got_initial_metadata = 0;
calld->on_done_recv = NULL;
- grpc_closure_init (&calld->hc_on_recv, hc_on_recv, elem);
- if (initial_op)
- hc_mutate_op (elem, initial_op);
+ grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
+ if (initial_op) hc_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
-static void
-destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem)
-{
-}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {}
-static const char *
-scheme_from_args (const grpc_channel_args * args)
-{
+static const char *scheme_from_args(const grpc_channel_args *args) {
unsigned i;
- if (args != NULL)
- {
- for (i = 0; i < args->num_args; ++i)
- {
- if (args->args[i].type == GRPC_ARG_STRING && strcmp (args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0)
- {
- return args->args[i].value.string;
- }
- }
+ if (args != NULL) {
+ for (i = 0; i < args->num_args; ++i) {
+ if (args->args[i].type == GRPC_ARG_STRING &&
+ strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) {
+ return args->args[i].value.string;
+ }
}
+ }
return "http";
}
-static grpc_mdstr *
-user_agent_from_args (grpc_mdctx * mdctx, const grpc_channel_args * args)
-{
+static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx,
+ const grpc_channel_args *args) {
gpr_strvec v;
size_t i;
int is_first = 1;
char *tmp;
grpc_mdstr *result;
- gpr_strvec_init (&v);
+ gpr_strvec_init(&v);
- for (i = 0; args && i < args->num_args; i++)
- {
- if (0 == strcmp (args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING))
- {
- if (args->args[i].type != GRPC_ARG_STRING)
- {
- gpr_log (GPR_ERROR, "Channel argument '%s' should be a string", GRPC_ARG_PRIMARY_USER_AGENT_STRING);
- }
- else
- {
- if (!is_first)
- gpr_strvec_add (&v, gpr_strdup (" "));
- is_first = 0;
- gpr_strvec_add (&v, gpr_strdup (args->args[i].value.string));
- }
- }
+ for (i = 0; args && i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) {
+ if (args->args[i].type != GRPC_ARG_STRING) {
+ gpr_log(GPR_ERROR, "Channel argument '%s' should be a string",
+ GRPC_ARG_PRIMARY_USER_AGENT_STRING);
+ } else {
+ if (!is_first) gpr_strvec_add(&v, gpr_strdup(" "));
+ is_first = 0;
+ gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string));
+ }
}
+ }
- gpr_asprintf (&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ", grpc_version_string (), GPR_PLATFORM_STRING);
+ gpr_asprintf(&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ",
+ grpc_version_string(), GPR_PLATFORM_STRING);
is_first = 0;
- gpr_strvec_add (&v, tmp);
+ gpr_strvec_add(&v, tmp);
- for (i = 0; args && i < args->num_args; i++)
- {
- if (0 == strcmp (args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING))
- {
- if (args->args[i].type != GRPC_ARG_STRING)
- {
- gpr_log (GPR_ERROR, "Channel argument '%s' should be a string", GRPC_ARG_SECONDARY_USER_AGENT_STRING);
- }
- else
- {
- if (!is_first)
- gpr_strvec_add (&v, gpr_strdup (" "));
- is_first = 0;
- gpr_strvec_add (&v, gpr_strdup (args->args[i].value.string));
- }
- }
+ for (i = 0; args && i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) {
+ if (args->args[i].type != GRPC_ARG_STRING) {
+ gpr_log(GPR_ERROR, "Channel argument '%s' should be a string",
+ GRPC_ARG_SECONDARY_USER_AGENT_STRING);
+ } else {
+ if (!is_first) gpr_strvec_add(&v, gpr_strdup(" "));
+ is_first = 0;
+ gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string));
+ }
}
+ }
- tmp = gpr_strvec_flatten (&v, NULL);
- gpr_strvec_destroy (&v);
- result = grpc_mdstr_from_string (mdctx, tmp, 0);
- gpr_free (tmp);
+ tmp = gpr_strvec_flatten(&v, NULL);
+ gpr_strvec_destroy(&v);
+ result = grpc_mdstr_from_string(mdctx, tmp, 0);
+ gpr_free(tmp);
return result;
}
/* Constructor for channel_data */
-static void
-init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * channel_args, grpc_mdctx * mdctx, int is_first, int is_last)
-{
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem, grpc_channel *master,
+ const grpc_channel_args *channel_args,
+ grpc_mdctx *mdctx, int is_first, int is_last) {
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
/* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down
path */
- GPR_ASSERT (!is_last);
+ GPR_ASSERT(!is_last);
/* initialize members */
- channeld->te_trailers = grpc_mdelem_from_strings (mdctx, "te", "trailers");
- channeld->method = grpc_mdelem_from_strings (mdctx, ":method", "POST");
- channeld->scheme = grpc_mdelem_from_strings (mdctx, ":scheme", scheme_from_args (channel_args));
- channeld->content_type = grpc_mdelem_from_strings (mdctx, "content-type", "application/grpc");
- channeld->status = grpc_mdelem_from_strings (mdctx, ":status", "200");
- channeld->user_agent = grpc_mdelem_from_metadata_strings (mdctx, grpc_mdstr_from_string (mdctx, "user-agent", 0), user_agent_from_args (mdctx, channel_args));
+ channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers");
+ channeld->method = grpc_mdelem_from_strings(mdctx, ":method", "POST");
+ channeld->scheme = grpc_mdelem_from_strings(mdctx, ":scheme",
+ scheme_from_args(channel_args));
+ channeld->content_type =
+ grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
+ channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200");
+ channeld->user_agent = grpc_mdelem_from_metadata_strings(
+ mdctx, grpc_mdstr_from_string(mdctx, "user-agent", 0),
+ user_agent_from_args(mdctx, channel_args));
}
/* Destructor for channel data */
-static void
-destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem)
-{
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
- GRPC_MDELEM_UNREF (channeld->te_trailers);
- GRPC_MDELEM_UNREF (channeld->method);
- GRPC_MDELEM_UNREF (channeld->scheme);
- GRPC_MDELEM_UNREF (channeld->content_type);
- GRPC_MDELEM_UNREF (channeld->status);
- GRPC_MDELEM_UNREF (channeld->user_agent);
+ GRPC_MDELEM_UNREF(channeld->te_trailers);
+ GRPC_MDELEM_UNREF(channeld->method);
+ GRPC_MDELEM_UNREF(channeld->scheme);
+ GRPC_MDELEM_UNREF(channeld->content_type);
+ GRPC_MDELEM_UNREF(channeld->status);
+ GRPC_MDELEM_UNREF(channeld->user_agent);
}
const grpc_channel_filter grpc_http_client_filter = {
- hc_start_transport_op, grpc_channel_next_op, sizeof (call_data),
- init_call_elem, destroy_call_elem, sizeof (channel_data),
- init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
- "http-client"
-};
+ hc_start_transport_op, grpc_channel_next_op, sizeof(call_data),
+ init_call_elem, destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
+ "http-client"};
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 6f776b8431..02a351d7b1 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -37,8 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-typedef struct call_data
-{
+typedef struct call_data {
gpr_uint8 got_initial_metadata;
gpr_uint8 seen_path;
gpr_uint8 seen_post;
@@ -58,8 +57,7 @@ typedef struct call_data
grpc_closure hs_on_recv;
} call_data;
-typedef struct channel_data
-{
+typedef struct channel_data {
grpc_mdelem *te_trailers;
grpc_mdelem *method_post;
grpc_mdelem *http_scheme;
@@ -76,268 +74,234 @@ typedef struct channel_data
grpc_mdctx *mdctx;
} channel_data;
-typedef struct
-{
+typedef struct {
grpc_call_element *elem;
grpc_exec_ctx *exec_ctx;
} server_filter_args;
-static grpc_mdelem *
-server_filter (void *user_data, grpc_mdelem * md)
-{
+static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
server_filter_args *a = user_data;
grpc_call_element *elem = a->elem;
channel_data *channeld = elem->channel_data;
call_data *calld = elem->call_data;
/* Check if it is one of the headers we care about. */
- if (md == channeld->te_trailers || md == channeld->method_post || md == channeld->http_scheme || md == channeld->https_scheme || md == channeld->grpc_scheme || md == channeld->content_type)
- {
- /* swallow it */
- if (md == channeld->method_post)
- {
- calld->seen_post = 1;
- }
- else if (md->key == channeld->http_scheme->key)
- {
- calld->seen_scheme = 1;
- }
- else if (md == channeld->te_trailers)
- {
- calld->seen_te_trailers = 1;
- }
- /* TODO(klempner): Track that we've seen all the headers we should
- require */
- return NULL;
+ if (md == channeld->te_trailers || md == channeld->method_post ||
+ md == channeld->http_scheme || md == channeld->https_scheme ||
+ md == channeld->grpc_scheme || md == channeld->content_type) {
+ /* swallow it */
+ if (md == channeld->method_post) {
+ calld->seen_post = 1;
+ } else if (md->key == channeld->http_scheme->key) {
+ calld->seen_scheme = 1;
+ } else if (md == channeld->te_trailers) {
+ calld->seen_te_trailers = 1;
}
- else if (md->key == channeld->content_type->key)
- {
- if (strncmp (grpc_mdstr_as_c_string (md->value), "application/grpc+", 17) == 0)
- {
- /* Although the C implementation doesn't (currently) generate them,
- any custom +-suffix is explicitly valid. */
- /* TODO(klempner): We should consider preallocating common values such
- as +proto or +json, or at least stashing them if we see them. */
- /* TODO(klempner): Should we be surfacing this to application code? */
- }
- else
- {
- /* TODO(klempner): We're currently allowing this, but we shouldn't
- see it without a proxy so log for now. */
- gpr_log (GPR_INFO, "Unexpected content-type %s", channeld->content_type->key);
- }
- return NULL;
+ /* TODO(klempner): Track that we've seen all the headers we should
+ require */
+ return NULL;
+ } else if (md->key == channeld->content_type->key) {
+ if (strncmp(grpc_mdstr_as_c_string(md->value), "application/grpc+", 17) ==
+ 0) {
+ /* Although the C implementation doesn't (currently) generate them,
+ any custom +-suffix is explicitly valid. */
+ /* TODO(klempner): We should consider preallocating common values such
+ as +proto or +json, or at least stashing them if we see them. */
+ /* TODO(klempner): Should we be surfacing this to application code? */
+ } else {
+ /* TODO(klempner): We're currently allowing this, but we shouldn't
+ see it without a proxy so log for now. */
+ gpr_log(GPR_INFO, "Unexpected content-type %s",
+ channeld->content_type->key);
}
- else if (md->key == channeld->te_trailers->key || md->key == channeld->method_post->key || md->key == channeld->http_scheme->key)
- {
- gpr_log (GPR_ERROR, "Invalid %s: header: '%s'", grpc_mdstr_as_c_string (md->key), grpc_mdstr_as_c_string (md->value));
- /* swallow it and error everything out. */
- /* TODO(klempner): We ought to generate more descriptive error messages
- on the wire here. */
- grpc_call_element_send_cancel (a->exec_ctx, elem);
+ return NULL;
+ } else if (md->key == channeld->te_trailers->key ||
+ md->key == channeld->method_post->key ||
+ md->key == channeld->http_scheme->key) {
+ gpr_log(GPR_ERROR, "Invalid %s: header: '%s'",
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value));
+ /* swallow it and error everything out. */
+ /* TODO(klempner): We ought to generate more descriptive error messages
+ on the wire here. */
+ grpc_call_element_send_cancel(a->exec_ctx, elem);
+ return NULL;
+ } else if (md->key == channeld->path_key) {
+ if (calld->seen_path) {
+ gpr_log(GPR_ERROR, "Received :path twice");
return NULL;
}
- else if (md->key == channeld->path_key)
- {
- if (calld->seen_path)
- {
- gpr_log (GPR_ERROR, "Received :path twice");
- return NULL;
- }
- calld->seen_path = 1;
- return md;
- }
- else if (md->key == channeld->authority_key)
- {
- calld->seen_authority = 1;
- return md;
- }
- else if (md->key == channeld->host_key)
- {
- /* translate host to :authority since :authority may be
- omitted */
- grpc_mdelem *authority = grpc_mdelem_from_metadata_strings (channeld->mdctx, GRPC_MDSTR_REF (channeld->authority_key),
- GRPC_MDSTR_REF (md->value));
- GRPC_MDELEM_UNREF (md);
- calld->seen_authority = 1;
- return authority;
- }
- else
- {
- return md;
- }
+ calld->seen_path = 1;
+ return md;
+ } else if (md->key == channeld->authority_key) {
+ calld->seen_authority = 1;
+ return md;
+ } else if (md->key == channeld->host_key) {
+ /* translate host to :authority since :authority may be
+ omitted */
+ grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
+ channeld->mdctx, GRPC_MDSTR_REF(channeld->authority_key),
+ GRPC_MDSTR_REF(md->value));
+ GRPC_MDELEM_UNREF(md);
+ calld->seen_authority = 1;
+ return authority;
+ } else {
+ return md;
+ }
}
-static void
-hs_on_recv (grpc_exec_ctx * exec_ctx, void *user_data, int success)
-{
+static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- if (success)
- {
- size_t i;
- size_t nops = calld->recv_ops->nops;
- grpc_stream_op *ops = calld->recv_ops->ops;
- for (i = 0; i < nops; i++)
- {
- grpc_stream_op *op = &ops[i];
- server_filter_args a;
- if (op->type != GRPC_OP_METADATA)
- continue;
- calld->got_initial_metadata = 1;
- a.elem = elem;
- a.exec_ctx = exec_ctx;
- grpc_metadata_batch_filter (&op->data.metadata, server_filter, &a);
- /* Have we seen the required http2 transport headers?
- (:method, :scheme, content-type, with :path and :authority covered
- at the channel level right now) */
- if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && calld->seen_path && calld->seen_authority)
- {
- /* do nothing */
- }
- else
- {
- if (!calld->seen_path)
- {
- gpr_log (GPR_ERROR, "Missing :path header");
- }
- if (!calld->seen_authority)
- {
- gpr_log (GPR_ERROR, "Missing :authority header");
- }
- if (!calld->seen_post)
- {
- gpr_log (GPR_ERROR, "Missing :method header");
- }
- if (!calld->seen_scheme)
- {
- gpr_log (GPR_ERROR, "Missing :scheme header");
- }
- if (!calld->seen_te_trailers)
- {
- gpr_log (GPR_ERROR, "Missing te trailers header");
- }
- /* Error this call out */
- success = 0;
- grpc_call_element_send_cancel (exec_ctx, elem);
- }
- }
+ if (success) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ server_filter_args a;
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->got_initial_metadata = 1;
+ a.elem = elem;
+ a.exec_ctx = exec_ctx;
+ grpc_metadata_batch_filter(&op->data.metadata, server_filter, &a);
+ /* Have we seen the required http2 transport headers?
+ (:method, :scheme, content-type, with :path and :authority covered
+ at the channel level right now) */
+ if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
+ calld->seen_path && calld->seen_authority) {
+ /* do nothing */
+ } else {
+ if (!calld->seen_path) {
+ gpr_log(GPR_ERROR, "Missing :path header");
+ }
+ if (!calld->seen_authority) {
+ gpr_log(GPR_ERROR, "Missing :authority header");
+ }
+ if (!calld->seen_post) {
+ gpr_log(GPR_ERROR, "Missing :method header");
+ }
+ if (!calld->seen_scheme) {
+ gpr_log(GPR_ERROR, "Missing :scheme header");
+ }
+ if (!calld->seen_te_trailers) {
+ gpr_log(GPR_ERROR, "Missing te trailers header");
+ }
+ /* Error this call out */
+ success = 0;
+ grpc_call_element_send_cancel(exec_ctx, elem);
+ }
}
- calld->on_done_recv->cb (exec_ctx, calld->on_done_recv->cb_arg, success);
+ }
+ calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
}
-static void
-hs_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op)
-{
+static void hs_mutate_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
size_t i;
- if (op->send_ops && !calld->sent_status)
- {
- size_t nops = op->send_ops->nops;
- grpc_stream_op *ops = op->send_ops->ops;
- for (i = 0; i < nops; i++)
- {
- grpc_stream_op *op = &ops[i];
- if (op->type != GRPC_OP_METADATA)
- continue;
- calld->sent_status = 1;
- grpc_metadata_batch_add_head (&op->data.metadata, &calld->status, GRPC_MDELEM_REF (channeld->status_ok));
- grpc_metadata_batch_add_tail (&op->data.metadata, &calld->content_type, GRPC_MDELEM_REF (channeld->content_type));
- break;
- }
+ if (op->send_ops && !calld->sent_status) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->sent_status = 1;
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
+ GRPC_MDELEM_REF(channeld->status_ok));
+ grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
+ GRPC_MDELEM_REF(channeld->content_type));
+ break;
}
+ }
- if (op->recv_ops && !calld->got_initial_metadata)
- {
- /* substitute our callback for the higher callback */
- calld->recv_ops = op->recv_ops;
- calld->on_done_recv = op->on_done_recv;
- op->on_done_recv = &calld->hs_on_recv;
- }
+ if (op->recv_ops && !calld->got_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ op->on_done_recv = &calld->hs_on_recv;
+ }
}
-static void
-hs_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op)
-{
- GRPC_CALL_LOG_OP (GPR_INFO, elem, op);
- hs_mutate_op (elem, op);
- grpc_call_next_op (exec_ctx, elem, op);
+static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ hs_mutate_op(elem, op);
+ grpc_call_next_op(exec_ctx, elem, op);
}
/* Constructor for call_data */
-static void
-init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op)
-{
+static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ const void *server_transport_data,
+ grpc_transport_stream_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
- memset (calld, 0, sizeof (*calld));
- grpc_closure_init (&calld->hs_on_recv, hs_on_recv, elem);
- if (initial_op)
- hs_mutate_op (elem, initial_op);
+ memset(calld, 0, sizeof(*calld));
+ grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
+ if (initial_op) hs_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
-static void
-destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem)
-{
-}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {}
/* Constructor for channel_data */
-static void
-init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last)
-{
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem, grpc_channel *master,
+ const grpc_channel_args *args, grpc_mdctx *mdctx,
+ int is_first, int is_last) {
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
/* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down
path */
- GPR_ASSERT (!is_first);
- GPR_ASSERT (!is_last);
+ GPR_ASSERT(!is_first);
+ GPR_ASSERT(!is_last);
/* initialize members */
- channeld->te_trailers = grpc_mdelem_from_strings (mdctx, "te", "trailers");
- channeld->status_ok = grpc_mdelem_from_strings (mdctx, ":status", "200");
- channeld->status_not_found = grpc_mdelem_from_strings (mdctx, ":status", "404");
- channeld->method_post = grpc_mdelem_from_strings (mdctx, ":method", "POST");
- channeld->http_scheme = grpc_mdelem_from_strings (mdctx, ":scheme", "http");
- channeld->https_scheme = grpc_mdelem_from_strings (mdctx, ":scheme", "https");
- channeld->grpc_scheme = grpc_mdelem_from_strings (mdctx, ":scheme", "grpc");
- channeld->path_key = grpc_mdstr_from_string (mdctx, ":path", 0);
- channeld->authority_key = grpc_mdstr_from_string (mdctx, ":authority", 0);
- channeld->host_key = grpc_mdstr_from_string (mdctx, "host", 0);
- channeld->content_type = grpc_mdelem_from_strings (mdctx, "content-type", "application/grpc");
+ channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers");
+ channeld->status_ok = grpc_mdelem_from_strings(mdctx, ":status", "200");
+ channeld->status_not_found =
+ grpc_mdelem_from_strings(mdctx, ":status", "404");
+ channeld->method_post = grpc_mdelem_from_strings(mdctx, ":method", "POST");
+ channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http");
+ channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https");
+ channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc");
+ channeld->path_key = grpc_mdstr_from_string(mdctx, ":path", 0);
+ channeld->authority_key = grpc_mdstr_from_string(mdctx, ":authority", 0);
+ channeld->host_key = grpc_mdstr_from_string(mdctx, "host", 0);
+ channeld->content_type =
+ grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
channeld->mdctx = mdctx;
}
/* Destructor for channel data */
-static void
-destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem)
-{
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
- GRPC_MDELEM_UNREF (channeld->te_trailers);
- GRPC_MDELEM_UNREF (channeld->status_ok);
- GRPC_MDELEM_UNREF (channeld->status_not_found);
- GRPC_MDELEM_UNREF (channeld->method_post);
- GRPC_MDELEM_UNREF (channeld->http_scheme);
- GRPC_MDELEM_UNREF (channeld->https_scheme);
- GRPC_MDELEM_UNREF (channeld->grpc_scheme);
- GRPC_MDELEM_UNREF (channeld->content_type);
- GRPC_MDSTR_UNREF (channeld->path_key);
- GRPC_MDSTR_UNREF (channeld->authority_key);
- GRPC_MDSTR_UNREF (channeld->host_key);
+ GRPC_MDELEM_UNREF(channeld->te_trailers);
+ GRPC_MDELEM_UNREF(channeld->status_ok);
+ GRPC_MDELEM_UNREF(channeld->status_not_found);
+ GRPC_MDELEM_UNREF(channeld->method_post);
+ GRPC_MDELEM_UNREF(channeld->http_scheme);
+ GRPC_MDELEM_UNREF(channeld->https_scheme);
+ GRPC_MDELEM_UNREF(channeld->grpc_scheme);
+ GRPC_MDELEM_UNREF(channeld->content_type);
+ GRPC_MDSTR_UNREF(channeld->path_key);
+ GRPC_MDSTR_UNREF(channeld->authority_key);
+ GRPC_MDSTR_UNREF(channeld->host_key);
}
const grpc_channel_filter grpc_http_server_filter = {
- hs_start_transport_op, grpc_channel_next_op, sizeof (call_data),
- init_call_elem, destroy_call_elem, sizeof (channel_data),
- init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
- "http-server"
-};
+ hs_start_transport_op, grpc_channel_next_op, sizeof(call_data),
+ init_call_elem, destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
+ "http-server"};
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index 7e566642f9..91b30d61ca 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -34,31 +34,25 @@
#include "src/core/channel/noop_filter.h"
#include <grpc/support/log.h>
-typedef struct call_data
-{
- int unused; /* C89 requires at least one struct element */
+typedef struct call_data {
+ int unused; /* C89 requires at least one struct element */
} call_data;
-typedef struct channel_data
-{
- int unused; /* C89 requires at least one struct element */
+typedef struct channel_data {
+ int unused; /* C89 requires at least one struct element */
} channel_data;
/* used to silence 'variable not used' warnings */
-static void
-ignore_unused (void *ignored)
-{
-}
+static void ignore_unused(void *ignored) {}
-static void
-noop_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op)
-{
+static void noop_mutate_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
- ignore_unused (calld);
- ignore_unused (channeld);
+ ignore_unused(calld);
+ ignore_unused(channeld);
/* do nothing */
}
@@ -68,19 +62,19 @@ noop_mutate_op (grpc_call_element * elem, grpc_transport_stream_op * op)
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
-static void
-noop_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op)
-{
- noop_mutate_op (elem, op);
+static void noop_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ noop_mutate_op(elem, op);
/* pass control down the stack */
- grpc_call_next_op (exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, op);
}
/* Constructor for call_data */
-static void
-init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op)
-{
+static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ const void *server_transport_data,
+ grpc_transport_stream_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -88,51 +82,47 @@ init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *
/* initialize members */
calld->unused = channeld->unused;
- if (initial_op)
- noop_mutate_op (elem, initial_op);
+ if (initial_op) noop_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
-static void
-destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem)
-{
-}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {}
/* Constructor for channel_data */
-static void
-init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * mdctx, int is_first, int is_last)
-{
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem, grpc_channel *master,
+ const grpc_channel_args *args, grpc_mdctx *mdctx,
+ int is_first, int is_last) {
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
/* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down
path */
- GPR_ASSERT (!is_first);
- GPR_ASSERT (!is_last);
+ GPR_ASSERT(!is_first);
+ GPR_ASSERT(!is_last);
/* initialize members */
channeld->unused = 0;
}
/* Destructor for channel data */
-static void
-destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem)
-{
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
- ignore_unused (channeld);
+ ignore_unused(channeld);
}
-const grpc_channel_filter grpc_no_op_filter = { noop_start_transport_stream_op,
- grpc_channel_next_op,
- sizeof (call_data),
- init_call_elem,
- destroy_call_elem,
- sizeof (channel_data),
- init_channel_elem,
- destroy_channel_elem,
- grpc_call_next_get_peer,
- "no-op"
-};
+const grpc_channel_filter grpc_no_op_filter = {noop_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "no-op"};