aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-08-29 12:16:56 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-08-29 12:16:56 -0700
commitd9b82bdecbf44cc7f9116be45a862457a727e87f (patch)
tree3d1894476ca48867e5ffe799301112f0735a8859 /src/core/lib/channel
parent016ad385e776cd41c0021739435c7bceedd9555c (diff)
parent9811915ba3fa1ccdf44b6a70fe1b1dd4782cd508 (diff)
Merge github.com:grpc/grpc into grpc_millis
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r--src/core/lib/channel/channel_args.c98
-rw-r--r--src/core/lib/channel/channel_args.h31
-rw-r--r--src/core/lib/channel/channel_stack.c16
-rw-r--r--src/core/lib/channel/channel_stack.h11
-rw-r--r--src/core/lib/channel/connected_channel.c92
5 files changed, 213 insertions, 35 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index 8fdef0bc64..02db798b5c 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -221,6 +221,21 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
return GRPC_COMPRESS_NONE;
}
+grpc_stream_compression_algorithm
+grpc_channel_args_get_stream_compression_algorithm(const grpc_channel_args *a) {
+ size_t i;
+ if (a == NULL) return 0;
+ for (i = 0; i < a->num_args; ++i) {
+ if (a->args[i].type == GRPC_ARG_INTEGER &&
+ !strcmp(GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
+ a->args[i].key)) {
+ return (grpc_stream_compression_algorithm)a->args[i].value.integer;
+ break;
+ }
+ }
+ return GRPC_STREAM_COMPRESS_NONE;
+}
+
grpc_channel_args *grpc_channel_args_set_compression_algorithm(
grpc_channel_args *a, grpc_compression_algorithm algorithm) {
GPR_ASSERT(algorithm < GRPC_COMPRESS_ALGORITHMS_COUNT);
@@ -231,6 +246,16 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
return grpc_channel_args_copy_and_add(a, &tmp, 1);
}
+grpc_channel_args *grpc_channel_args_set_stream_compression_algorithm(
+ grpc_channel_args *a, grpc_stream_compression_algorithm algorithm) {
+ GPR_ASSERT(algorithm < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
+ grpc_arg tmp;
+ tmp.type = GRPC_ARG_INTEGER;
+ tmp.key = GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM;
+ tmp.value.integer = algorithm;
+ return grpc_channel_args_copy_and_add(a, &tmp, 1);
+}
+
/** Returns 1 if the argument for compression algorithm's enabled states bitset
* was found in \a a, returning the arg's value in \a states. Otherwise, returns
* 0. */
@@ -251,6 +276,26 @@ static int find_compression_algorithm_states_bitset(const grpc_channel_args *a,
return 0; /* GPR_FALSE */
}
+/** Returns 1 if the argument for compression algorithm's enabled states bitset
+ * was found in \a a, returning the arg's value in \a states. Otherwise, returns
+ * 0. */
+static int find_stream_compression_algorithm_states_bitset(
+ const grpc_channel_args *a, int **states_arg) {
+ if (a != NULL) {
+ size_t i;
+ for (i = 0; i < a->num_args; ++i) {
+ if (a->args[i].type == GRPC_ARG_INTEGER &&
+ !strcmp(GRPC_STREAM_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
+ a->args[i].key)) {
+ *states_arg = &a->args[i].value.integer;
+ **states_arg |= 0x1; /* forcefully enable support for no compression */
+ return 1;
+ }
+ }
+ }
+ return 0; /* GPR_FALSE */
+}
+
grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
grpc_exec_ctx *exec_ctx, grpc_channel_args **a,
grpc_compression_algorithm algorithm, int state) {
@@ -292,6 +337,48 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
return result;
}
+grpc_channel_args *grpc_channel_args_stream_compression_algorithm_set_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_args **a,
+ grpc_stream_compression_algorithm algorithm, int state) {
+ int *states_arg = NULL;
+ grpc_channel_args *result = *a;
+ const int states_arg_found =
+ find_stream_compression_algorithm_states_bitset(*a, &states_arg);
+
+ if (grpc_channel_args_get_stream_compression_algorithm(*a) == algorithm &&
+ state == 0) {
+ char *algo_name = NULL;
+ GPR_ASSERT(grpc_stream_compression_algorithm_name(algorithm, &algo_name) !=
+ 0);
+ gpr_log(GPR_ERROR,
+ "Tried to disable default stream compression algorithm '%s'. The "
+ "operation has been ignored.",
+ algo_name);
+ } else if (states_arg_found) {
+ if (state != 0) {
+ GPR_BITSET((unsigned *)states_arg, algorithm);
+ } else if (algorithm != GRPC_STREAM_COMPRESS_NONE) {
+ GPR_BITCLEAR((unsigned *)states_arg, algorithm);
+ }
+ } else {
+ /* create a new arg */
+ grpc_arg tmp;
+ tmp.type = GRPC_ARG_INTEGER;
+ tmp.key = GRPC_STREAM_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET;
+ /* all enabled by default */
+ tmp.value.integer = (1u << GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) - 1;
+ if (state != 0) {
+ GPR_BITSET((unsigned *)&tmp.value.integer, algorithm);
+ } else if (algorithm != GRPC_STREAM_COMPRESS_NONE) {
+ GPR_BITCLEAR((unsigned *)&tmp.value.integer, algorithm);
+ }
+ result = grpc_channel_args_copy_and_add(*a, &tmp, 1);
+ grpc_channel_args_destroy(exec_ctx, *a);
+ *a = result;
+ }
+ return result;
+}
+
uint32_t grpc_channel_args_compression_algorithm_get_states(
const grpc_channel_args *a) {
int *states_arg;
@@ -302,6 +389,17 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
}
}
+uint32_t grpc_channel_args_stream_compression_algorithm_get_states(
+ const grpc_channel_args *a) {
+ int *states_arg;
+ if (find_stream_compression_algorithm_states_bitset(a, &states_arg)) {
+ return (uint32_t)*states_arg;
+ } else {
+ return (1u << GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) -
+ 1; /* All algs. enabled */
+ }
+}
+
grpc_channel_args *grpc_channel_args_set_socket_mutator(
grpc_channel_args *a, grpc_socket_mutator *mutator) {
grpc_arg tmp = grpc_socket_mutator_to_arg(mutator);
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index f649a8d9ec..0599e189c3 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -59,12 +59,24 @@ void grpc_channel_args_destroy(grpc_exec_ctx *exec_ctx, grpc_channel_args *a);
grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
const grpc_channel_args *a);
+/** Returns the stream compression algorithm set in \a a. */
+grpc_stream_compression_algorithm
+grpc_channel_args_get_stream_compression_algorithm(const grpc_channel_args *a);
+
/** Returns a channel arg instance with compression enabled. If \a a is
* non-NULL, its args are copied. N.B. GRPC_COMPRESS_NONE disables compression
* for the channel. */
grpc_channel_args *grpc_channel_args_set_compression_algorithm(
grpc_channel_args *a, grpc_compression_algorithm algorithm);
+/** Returns a channel arg instance with stream compression enabled. If \a a is
+ * non-NULL, its args are copied. N.B. GRPC_STREAM_COMPRESS_NONE disables
+ * stream compression for the channel. If a value other than
+ * GRPC_STREAM_COMPRESS_NONE is set, it takes precedence over message-wise
+ * compression algorithms. */
+grpc_channel_args *grpc_channel_args_set_stream_compression_algorithm(
+ grpc_channel_args *a, grpc_stream_compression_algorithm algorithm);
+
/** Sets the support for the given compression algorithm. By default, all
* compression algorithms are enabled. It's an error to disable an algorithm set
* by grpc_channel_args_set_compression_algorithm.
@@ -76,6 +88,17 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
grpc_exec_ctx *exec_ctx, grpc_channel_args **a,
grpc_compression_algorithm algorithm, int enabled);
+/** Sets the support for the given stream compression algorithm. By default, all
+ * stream compression algorithms are enabled. It's an error to disable an
+ * algorithm set by grpc_channel_args_set_stream_compression_algorithm.
+ *
+ * Returns an instance with the updated algorithm states. The \a a pointer is
+ * modified to point to the returned instance (which may be different from the
+ * input value of \a a). */
+grpc_channel_args *grpc_channel_args_stream_compression_algorithm_set_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_args **a,
+ grpc_stream_compression_algorithm algorithm, int enabled);
+
/** Returns the bitset representing the support state (true for enabled, false
* for disabled) for compression algorithms.
*
@@ -84,6 +107,14 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
uint32_t grpc_channel_args_compression_algorithm_get_states(
const grpc_channel_args *a);
+/** Returns the bitset representing the support state (true for enabled, false
+ * for disabled) for stream compression algorithms.
+ *
+ * The i-th bit of the returned bitset corresponds to the i-th entry in the
+ * grpc_stream_compression_algorithm enum. */
+uint32_t grpc_channel_args_stream_compression_algorithm_get_states(
+ const grpc_channel_args *a);
+
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b);
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 0f8e33c4be..775c8bc667 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -233,15 +233,10 @@ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
grpc_call_element *next_elem = elem + 1;
+ GRPC_CALL_LOG_OP(GPR_INFO, next_elem, op);
next_elem->filter->start_transport_stream_op_batch(exec_ctx, next_elem, op);
}
-char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- grpc_call_element *next_elem = elem + 1;
- return next_elem->filter->get_peer(exec_ctx, next_elem);
-}
-
void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
const grpc_channel_info *channel_info) {
@@ -265,12 +260,3 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
sizeof(grpc_call_stack)));
}
-
-void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_error *error) {
- grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(NULL);
- op->cancel_stream = true;
- op->payload->cancel_stream.cancel_error = error;
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
-}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 19506e492b..e616a51024 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -40,6 +40,7 @@
#include <grpc/support/time.h>
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/support/arena.h"
#include "src/core/lib/transport/transport.h"
@@ -71,6 +72,7 @@ typedef struct {
gpr_timespec start_time;
grpc_millis deadline;
gpr_arena *arena;
+ grpc_call_combiner *call_combiner;
} grpc_call_element_args;
typedef struct {
@@ -150,9 +152,6 @@ typedef struct {
void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem);
- /* Implement grpc_call_get_peer() */
- char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
-
/* Implement grpc_channel_get_info() */
void (*get_channel_info)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
const grpc_channel_info *channel_info);
@@ -271,8 +270,6 @@ void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
stack */
void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
grpc_transport_op *op);
-/* Pass through a request to get_peer to the next child element */
-char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
/* Pass through a request to get_channel_info() to the next child element */
void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
@@ -288,10 +285,6 @@ void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem,
grpc_transport_stream_op_batch *op);
-void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
- grpc_call_element *cur_elem,
- grpc_error *error);
-
extern grpc_tracer_flag grpc_trace_channel;
#define GRPC_CALL_LOG_OP(sev, elem, op) \
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index af06ca802e..8285226fc4 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -36,7 +36,57 @@ typedef struct connected_channel_channel_data {
grpc_transport *transport;
} channel_data;
-typedef struct connected_channel_call_data { void *unused; } call_data;
+typedef struct {
+ grpc_closure closure;
+ grpc_closure *original_closure;
+ grpc_call_combiner *call_combiner;
+ const char *reason;
+} callback_state;
+
+typedef struct connected_channel_call_data {
+ grpc_call_combiner *call_combiner;
+ // Closures used for returning results on the call combiner.
+ callback_state on_complete[6]; // Max number of pending batches.
+ callback_state recv_initial_metadata_ready;
+ callback_state recv_message_ready;
+} call_data;
+
+static void run_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ callback_state *state = (callback_state *)arg;
+ GRPC_CALL_COMBINER_START(exec_ctx, state->call_combiner,
+ state->original_closure, GRPC_ERROR_REF(error),
+ state->reason);
+}
+
+static void run_cancel_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ run_in_call_combiner(exec_ctx, arg, error);
+ gpr_free(arg);
+}
+
+static void intercept_callback(call_data *calld, callback_state *state,
+ bool free_when_done, const char *reason,
+ grpc_closure **original_closure) {
+ state->original_closure = *original_closure;
+ state->call_combiner = calld->call_combiner;
+ state->reason = reason;
+ *original_closure = GRPC_CLOSURE_INIT(
+ &state->closure,
+ free_when_done ? run_cancel_in_call_combiner : run_in_call_combiner,
+ state, grpc_schedule_on_exec_ctx);
+}
+
+static callback_state *get_state_for_batch(
+ call_data *calld, grpc_transport_stream_op_batch *batch) {
+ if (batch->send_initial_metadata) return &calld->on_complete[0];
+ if (batch->send_message) return &calld->on_complete[1];
+ if (batch->send_trailing_metadata) return &calld->on_complete[2];
+ if (batch->recv_initial_metadata) return &calld->on_complete[3];
+ if (batch->recv_message) return &calld->on_complete[4];
+ if (batch->recv_trailing_metadata) return &calld->on_complete[5];
+ GPR_UNREACHABLE_CODE(return NULL);
+}
/* We perform a small hack to locate transport data alongside the connected
channel data in call allocations, to allow everything to be pulled in minimal
@@ -49,13 +99,38 @@ typedef struct connected_channel_call_data { void *unused; } call_data;
into transport stream operations */
static void con_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
+ if (batch->recv_initial_metadata) {
+ callback_state *state = &calld->recv_initial_metadata_ready;
+ intercept_callback(
+ calld, state, false, "recv_initial_metadata_ready",
+ &batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
+ }
+ if (batch->recv_message) {
+ callback_state *state = &calld->recv_message_ready;
+ intercept_callback(calld, state, false, "recv_message_ready",
+ &batch->payload->recv_message.recv_message_ready);
+ }
+ if (batch->cancel_stream) {
+ // There can be more than one cancellation batch in flight at any
+ // given time, so we can't just pick out a fixed index into
+ // calld->on_complete like we can for the other ops. However,
+ // cancellation isn't in the fast path, so we just allocate a new
+ // closure for each one.
+ callback_state *state = (callback_state *)gpr_malloc(sizeof(*state));
+ intercept_callback(calld, state, true, "on_complete (cancel_stream)",
+ &batch->on_complete);
+ } else {
+ callback_state *state = get_state_for_batch(calld, batch);
+ intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
+ }
grpc_transport_perform_stream_op(exec_ctx, chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+ batch);
+ GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
+ "passed batch to transport");
}
static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -71,6 +146,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
+ calld->call_combiner = args->call_combiner;
int r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
&args->call_stack->refcount, args->server_transport_data, args->arena);
@@ -118,11 +194,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
}
-static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
- return grpc_transport_get_peer(exec_ctx, chand->transport);
-}
-
/* No-op. */
static void con_get_channel_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
@@ -138,7 +209,6 @@ const grpc_channel_filter grpc_connected_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- con_get_peer,
con_get_channel_info,
"connected",
};