aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-11-16 15:54:03 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-11-16 15:54:03 -0800
commit1094c32f358cef7df323952c117a6773809019c0 (patch)
tree5923eced0f83d9f5df5a04c77b919b22bc2d9dae /src/core/lib/channel
parentdf5b62755155883a888a783b783319e8a2f1110e (diff)
parentf2752ebb3b81cbad4e3b9ac0df5d7021c4659243 (diff)
Merge branch 'slice_with_exec_ctx' into slice_interning
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r--src/core/lib/channel/channel_args.c6
-rw-r--r--src/core/lib/channel/channel_args.h8
-rw-r--r--src/core/lib/channel/channel_stack.c5
-rw-r--r--src/core/lib/channel/channel_stack.h3
-rw-r--r--src/core/lib/channel/compress_filter.c3
-rw-r--r--src/core/lib/channel/http_client_filter.c75
-rw-r--r--src/core/lib/channel/http_server_filter.c47
7 files changed, 106 insertions, 41 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index c956b0febc..1a099ac437 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -300,6 +300,12 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
}
}
+grpc_channel_args *grpc_channel_args_set_socket_mutator(
+ grpc_channel_args *a, grpc_socket_mutator *mutator) {
+ grpc_arg tmp = grpc_socket_mutator_to_arg(mutator);
+ return grpc_channel_args_copy_and_add(a, &tmp, 1);
+}
+
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b) {
int c = GPR_ICMP(a->num_args, b->num_args);
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 5933133413..5c7d31f8bb 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -36,6 +36,7 @@
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include "src/core/lib/iomgr/socket_mutator.h"
// Channel args are intentionally immutable, to avoid the need for locking.
@@ -101,6 +102,13 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b);
+/** Returns a channel arg instance with socket mutator added. The socket mutator
+ * will perform its mutate_fd method on all file descriptors used by the
+ * channel.
+ * If \a a is non-MULL, its args are copied. */
+grpc_channel_args *grpc_channel_args_set_socket_mutator(
+ grpc_channel_args *a, grpc_socket_mutator *mutator);
+
/** Returns the value of argument \a name from \a args, or NULL if not found. */
const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
const char *name);
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 947dff4cb3..285ac178af 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -162,7 +162,8 @@ grpc_error *grpc_call_stack_init(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context, const void *transport_server_data,
- grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack) {
+ grpc_mdstr *path, gpr_timespec start_time, gpr_timespec deadline,
+ grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
@@ -179,7 +180,7 @@ grpc_error *grpc_call_stack_init(
/* init per-filter data */
grpc_error *first_error = GRPC_ERROR_NONE;
- args.start_time = gpr_now(GPR_CLOCK_MONOTONIC);
+ args.start_time = start_time;
for (i = 0; i < count; i++) {
args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index c3b662c969..004643d45f 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -231,7 +231,8 @@ grpc_error *grpc_call_stack_init(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context, const void *transport_server_data,
- grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack);
+ grpc_mdstr *path, gpr_timespec start_time, gpr_timespec deadline,
+ grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index dd496ee095..96188f01c4 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -83,7 +83,8 @@ typedef struct channel_data {
/** For each \a md element from the incoming metadata, filter out the entry for
* "grpc-encoding", using its value to populate the call data's
* compression_algorithm field. */
-static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *compression_md_filter(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 63afa4bf09..27064e0fb8 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -36,6 +36,7 @@
#include <grpc/support/string_util.h>
#include <string.h>
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -57,6 +58,7 @@ typedef struct call_data {
grpc_linked_mdelem payload_bin;
grpc_metadata_batch *recv_initial_metadata;
+ grpc_metadata_batch *recv_trailing_metadata;
uint8_t *payload_bytes;
/* Vars to read data off of send_message */
@@ -70,14 +72,16 @@ typedef struct call_data {
bool send_message_blocked;
/** Closure to call when finished with the hc_on_recv hook */
- grpc_closure *on_done_recv;
+ grpc_closure *on_done_recv_initial_metadata;
+ grpc_closure *on_done_recv_trailing_metadata;
grpc_closure *on_complete;
grpc_closure *post_send;
/** Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member
after handling it. */
- grpc_closure hc_on_recv;
+ grpc_closure hc_on_recv_initial_metadata;
+ grpc_closure hc_on_recv_trailing_metadata;
grpc_closure hc_on_complete;
grpc_closure got_slice;
grpc_closure send_done;
@@ -89,13 +93,9 @@ typedef struct channel_data {
size_t max_payload_size_for_get;
} channel_data;
-typedef struct {
- grpc_call_element *elem;
- grpc_exec_ctx *exec_ctx;
-} client_recv_filter_args;
-
-static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
- client_recv_filter_args *a = user_data;
+static grpc_mdelem *client_recv_filter(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
if (md == GRPC_MDELEM_STATUS_200) {
return NULL;
} else if (md->key == GRPC_MDSTR_STATUS) {
@@ -104,9 +104,20 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
grpc_mdstr_as_c_string(md->value));
grpc_slice message = grpc_slice_from_copied_string(message_string);
gpr_free(message_string);
- grpc_call_element_send_close_with_message(a->exec_ctx, a->elem,
+ grpc_call_element_send_close_with_message(exec_ctx, elem,
GRPC_STATUS_CANCELLED, &message);
return NULL;
+ } else if (md->key == GRPC_MDSTR_GRPC_MESSAGE) {
+ grpc_slice pct_decoded_msg =
+ grpc_permissive_percent_decode_slice(md->value->slice);
+ if (grpc_slice_is_equivalent(pct_decoded_msg, md->value->slice)) {
+ grpc_slice_unref(pct_decoded_msg);
+ return md;
+ } else {
+ return grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
+ grpc_mdstr_from_slice(exec_ctx, pct_decoded_msg));
+ }
} else if (md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) {
return NULL;
} else if (md->key == GRPC_MDSTR_CONTENT_TYPE) {
@@ -130,16 +141,24 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_error *error) {
+static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_error *error) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- client_recv_filter_args a;
- a.elem = elem;
- a.exec_ctx = exec_ctx;
grpc_metadata_batch_filter(exec_ctx, calld->recv_initial_metadata,
- client_recv_filter, &a);
- calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, error);
+ client_recv_filter, elem);
+ grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata,
+ GRPC_ERROR_REF(error));
+}
+
+static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_error *error) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ grpc_metadata_batch_filter(exec_ctx, calld->recv_trailing_metadata,
+ client_recv_filter, elem);
+ grpc_closure_run(exec_ctx, calld->on_done_recv_trailing_metadata,
+ GRPC_ERROR_REF(error));
}
static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
@@ -160,7 +179,8 @@ static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
}
-static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *client_strip_filter(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_mdelem *md) {
/* eat the things we'd like to set ourselves */
if (md->key == GRPC_MDSTR_METHOD) return NULL;
if (md->key == GRPC_MDSTR_SCHEME) return NULL;
@@ -282,8 +302,15 @@ static void hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
if (op->recv_initial_metadata != NULL) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv = op->recv_initial_metadata_ready;
- op->recv_initial_metadata_ready = &calld->hc_on_recv;
+ calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->hc_on_recv_initial_metadata;
+ }
+
+ if (op->recv_trailing_metadata != NULL) {
+ /* substitute our callback for the higher callback */
+ calld->recv_trailing_metadata = op->recv_trailing_metadata;
+ calld->on_done_recv_trailing_metadata = op->on_complete;
+ op->on_complete = &calld->hc_on_recv_trailing_metadata;
}
}
@@ -309,11 +336,15 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
- calld->on_done_recv = NULL;
+ calld->on_done_recv_initial_metadata = NULL;
+ calld->on_done_recv_trailing_metadata = NULL;
calld->on_complete = NULL;
calld->payload_bytes = NULL;
grpc_slice_buffer_init(&calld->slices);
- grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
+ grpc_closure_init(&calld->hc_on_recv_initial_metadata,
+ hc_on_recv_initial_metadata, elem);
+ grpc_closure_init(&calld->hc_on_recv_trailing_metadata,
+ hc_on_recv_trailing_metadata, elem);
grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem);
grpc_closure_init(&calld->got_slice, got_slice, elem);
grpc_closure_init(&calld->send_done, send_done, elem);
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index cb0fe230cf..db39020dca 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -37,6 +37,7 @@
#include <grpc/support/log.h>
#include <string.h>
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -82,14 +83,28 @@ typedef struct call_data {
typedef struct channel_data { uint8_t unused; } channel_data;
-typedef struct {
- grpc_call_element *elem;
- grpc_exec_ctx *exec_ctx;
-} server_filter_args;
+static grpc_mdelem *server_filter_outgoing_metadata(grpc_exec_ctx *exec_ctx,
+ void *user_data,
+ grpc_mdelem *md) {
+ if (md->key == GRPC_MDSTR_GRPC_MESSAGE) {
+ grpc_slice pct_encoded_msg = grpc_percent_encode_slice(
+ md->value->slice, grpc_compatible_percent_encoding_unreserved_bytes);
+ if (grpc_slice_is_equivalent(pct_encoded_msg, md->value->slice)) {
+ grpc_slice_unref_internal(exec_ctx, pct_encoded_msg);
+ return md;
+ } else {
+ return grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
+ grpc_mdstr_from_slice(exec_ctx, pct_encoded_msg));
+ }
+ } else {
+ return md;
+ }
+}
-static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
- server_filter_args *a = user_data;
- grpc_call_element *elem = a->elem;
+static grpc_mdelem *server_filter(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
/* Check if it is one of the headers we care about. */
@@ -140,7 +155,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
/* swallow it and error everything out. */
/* TODO(klempner): We ought to generate more descriptive error messages
on the wire here. */
- grpc_call_element_send_cancel(a->exec_ctx, elem);
+ grpc_call_element_send_cancel(exec_ctx, elem);
return NULL;
} else if (md->key == GRPC_MDSTR_PATH) {
if (calld->seen_path) {
@@ -156,7 +171,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
/* translate host to :authority since :authority may be
omitted */
grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
- a->exec_ctx, GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value));
+ exec_ctx, GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value));
calld->seen_authority = 1;
return authority;
} else if (md->key == GRPC_MDSTR_GRPC_PAYLOAD_BIN) {
@@ -178,11 +193,8 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (err == GRPC_ERROR_NONE) {
- server_filter_args a;
- a.elem = elem;
- a.exec_ctx = exec_ctx;
grpc_metadata_batch_filter(exec_ctx, calld->recv_initial_metadata,
- server_filter, &a);
+ server_filter, elem);
/* Have we seen the required http2 transport headers?
(:method, :scheme, content-type, with :path and :authority covered
at the channel level right now) */
@@ -256,7 +268,7 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
}
}
-static void hs_mutate_op(grpc_call_element *elem,
+static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
@@ -292,6 +304,11 @@ static void hs_mutate_op(grpc_call_element *elem,
op->on_complete = &calld->hs_on_complete;
}
}
+
+ if (op->send_trailing_metadata) {
+ grpc_metadata_batch_filter(exec_ctx, op->send_trailing_metadata,
+ server_filter_outgoing_metadata, elem);
+ }
}
static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -299,7 +316,7 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
GPR_TIMER_BEGIN("hs_start_transport_op", 0);
- hs_mutate_op(elem, op);
+ hs_mutate_op(exec_ctx, elem, op);
grpc_call_next_op(exec_ctx, elem, op);
GPR_TIMER_END("hs_start_transport_op", 0);
}