aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/channel_stack.c5
-rw-r--r--src/core/channel/channel_stack.h5
-rw-r--r--src/core/channel/client_channel.c199
-rw-r--r--src/core/channel/client_channel.h14
-rw-r--r--src/core/channel/compress_filter.c25
-rw-r--r--src/core/channel/connected_channel.c6
-rw-r--r--src/core/channel/http_client_filter.c64
-rw-r--r--src/core/channel/http_server_filter.c19
-rw-r--r--src/core/channel/noop_filter.c1
9 files changed, 293 insertions, 45 deletions
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index e38dcb58b7..cd7c182ef2 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -191,6 +191,11 @@ void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op) {
next_elem->filter->start_transport_stream_op(next_elem, op);
}
+char *grpc_call_next_get_peer(grpc_call_element *elem) {
+ grpc_call_element *next_elem = elem + 1;
+ return next_elem->filter->get_peer(next_elem);
+}
+
void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op) {
grpc_channel_element *next_elem = elem + 1;
next_elem->filter->start_transport_op(next_elem, op);
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 785be8925b..4a608b956e 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -104,6 +104,9 @@ typedef struct {
The filter does not need to do any chaining */
void (*destroy_channel_elem)(grpc_channel_element *elem);
+ /* Implement grpc_call_get_peer() */
+ char *(*get_peer)(grpc_call_element *elem);
+
/* The name of this filter */
const char *name;
} grpc_channel_filter;
@@ -173,6 +176,8 @@ void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op);
+/* Pass through a request to get_peer to the next child element */
+char *grpc_call_next_get_peer(grpc_call_element *elem);
/* Given the top element of a channel stack, get the channel stack itself */
grpc_channel_stack *grpc_channel_stack_from_top_element(
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index c1aa580b2d..a293c93ec6 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -40,7 +40,6 @@
#include "src/core/channel/connected_channel.h"
#include "src/core/surface/channel.h"
#include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/pollset_set.h"
#include "src/core/support/string.h"
#include "src/core/transport/connectivity_state.h"
#include <grpc/support/alloc.h>
@@ -57,6 +56,8 @@ typedef struct {
grpc_mdctx *mdctx;
/** resolver for this channel */
grpc_resolver *resolver;
+ /** have we started resolving this channel */
+ int started_resolving;
/** master channel - the grpc_channel instance that ultimately owns
this channel_data via its channel stack.
We occasionally use this to bump the refcount on the master channel
@@ -77,8 +78,22 @@ typedef struct {
grpc_iomgr_closure on_config_changed;
/** connectivity state being tracked */
grpc_connectivity_state_tracker state_tracker;
+ /** when an lb_policy arrives, should we try to exit idle */
+ int exit_idle_when_lb_policy_arrives;
+ /** pollset_set of interested parties in a new connection */
+ grpc_pollset_set pollset_set;
} channel_data;
+/** We create one watcher for each new lb_policy that is returned from a resolver,
+ to watch for state changes from the lb_policy. When a state change is seen, we
+ update the channel, and create a new watcher */
+typedef struct {
+ channel_data *chand;
+ grpc_iomgr_closure on_changed;
+ grpc_connectivity_state state;
+ grpc_lb_policy *lb_policy;
+} lb_policy_connectivity_watcher;
+
typedef enum {
CALL_CREATED,
CALL_WAITING_FOR_SEND,
@@ -265,6 +280,26 @@ static grpc_iomgr_closure *merge_into_waiting_op(
return consumed_op;
}
+static char *cc_get_peer(grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_subchannel_call *subchannel_call;
+ char *result;
+
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == CALL_ACTIVE) {
+ subchannel_call = calld->subchannel_call;
+ GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
+ gpr_mu_unlock(&calld->mu_state);
+ result = grpc_subchannel_call_get_peer(subchannel_call);
+ GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "get_peer");
+ return result;
+ } else {
+ gpr_mu_unlock(&calld->mu_state);
+ return grpc_channel_get_target(chand->master);
+ }
+}
+
static void perform_transport_stream_op(grpc_call_element *elem,
grpc_transport_stream_op *op,
int continuation) {
@@ -365,6 +400,13 @@ static void perform_transport_stream_op(grpc_call_element *elem,
} else if (chand->resolver != NULL) {
calld->state = CALL_WAITING_FOR_CONFIG;
add_to_lb_policy_wait_queue_locked_state_config(elem);
+ if (!chand->started_resolving && chand->resolver != NULL) {
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ chand->started_resolving = 1;
+ grpc_resolver_next(chand->resolver,
+ &chand->incoming_configuration,
+ &chand->on_config_changed);
+ }
gpr_mu_unlock(&chand->mu_config);
gpr_mu_unlock(&calld->mu_state);
} else {
@@ -388,16 +430,53 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
perform_transport_stream_op(elem, op, 0);
}
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state);
+
+static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
+ lb_policy_connectivity_watcher *w = arg;
+
+ gpr_mu_lock(&w->chand->mu_config);
+ /* check if the notification is for a stale policy */
+ if (w->lb_policy == w->chand->lb_policy) {
+ grpc_connectivity_state_set(&w->chand->state_tracker, w->state,
+ "lb_changed");
+ if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
+ watch_lb_policy(w->chand, w->lb_policy, w->state);
+ }
+ }
+ gpr_mu_unlock(&w->chand->mu_config);
+
+ GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
+ gpr_free(w);
+}
+
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) {
+ lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
+
+ w->chand = chand;
+ grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
+ w->state = current_state;
+ w->lb_policy = lb_policy;
+ grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed);
+}
+
static void cc_on_config_changed(void *arg, int iomgr_success) {
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
grpc_resolver *old_resolver;
grpc_iomgr_closure *wakeup_closures = NULL;
+ grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ int exit_idle = 0;
if (chand->incoming_configuration != NULL) {
lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
- GRPC_LB_POLICY_REF(lb_policy, "channel");
+ if (lb_policy != NULL) {
+ GRPC_LB_POLICY_REF(lb_policy, "channel");
+ GRPC_LB_POLICY_REF(lb_policy, "config_change");
+ state = grpc_lb_policy_check_connectivity(lb_policy);
+ }
grpc_client_config_unref(chand->incoming_configuration);
}
@@ -411,13 +490,12 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
wakeup_closures = chand->waiting_for_config_closures;
chand->waiting_for_config_closures = NULL;
}
- gpr_mu_unlock(&chand->mu_config);
-
- if (old_lb_policy) {
- GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
+ if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
+ GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
+ exit_idle = 1;
+ chand->exit_idle_when_lb_policy_arrives = 0;
}
- gpr_mu_lock(&chand->mu_config);
if (iomgr_success && chand->resolver) {
grpc_resolver *resolver = chand->resolver;
GRPC_RESOLVER_REF(resolver, "channel-next");
@@ -426,11 +504,16 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
grpc_resolver_next(resolver, &chand->incoming_configuration,
&chand->on_config_changed);
GRPC_RESOLVER_UNREF(resolver, "channel-next");
+ grpc_connectivity_state_set(&chand->state_tracker, state,
+ "new_lb+resolver");
+ if (lb_policy != NULL) {
+ watch_lb_policy(chand, lb_policy, state);
+ }
} else {
old_resolver = chand->resolver;
chand->resolver = NULL;
grpc_connectivity_state_set(&chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE);
+ GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
gpr_mu_unlock(&chand->mu_config);
if (old_resolver != NULL) {
grpc_resolver_shutdown(old_resolver);
@@ -438,12 +521,24 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
}
+ if (exit_idle) {
+ grpc_lb_policy_exit_idle(lb_policy);
+ GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
+ }
+
+ if (old_lb_policy != NULL) {
+ GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
+ }
+
while (wakeup_closures) {
grpc_iomgr_closure *next = wakeup_closures->next;
- grpc_iomgr_add_callback(wakeup_closures);
+ wakeup_closures->cb(wakeup_closures->cb_arg, 1);
wakeup_closures = next;
}
+ if (lb_policy != NULL) {
+ GRPC_LB_POLICY_UNREF(lb_policy, "config_change");
+ }
GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
}
@@ -467,20 +562,22 @@ static void cc_start_transport_op(grpc_channel_element *elem,
op->connectivity_state = NULL;
}
+ if (!is_empty(op, sizeof(*op))) {
+ lb_policy = chand->lb_policy;
+ if (lb_policy) {
+ GRPC_LB_POLICY_REF(lb_policy, "broadcast");
+ }
+ }
+
if (op->disconnect && chand->resolver != NULL) {
grpc_connectivity_state_set(&chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE);
+ GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
destroy_resolver = chand->resolver;
chand->resolver = NULL;
if (chand->lb_policy != NULL) {
grpc_lb_policy_shutdown(chand->lb_policy);
- }
- }
-
- if (!is_empty(op, sizeof(*op))) {
- lb_policy = chand->lb_policy;
- if (lb_policy) {
- GRPC_LB_POLICY_REF(lb_policy, "broadcast");
+ GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
+ chand->lb_policy = NULL;
}
}
gpr_mu_unlock(&chand->mu_config);
@@ -561,10 +658,11 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
gpr_mu_init(&chand->mu_config);
chand->mdctx = metadata_context;
chand->master = master;
+ grpc_pollset_set_init(&chand->pollset_set);
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
chand);
- grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE);
+ grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel");
}
/* Destructor for channel_data */
@@ -578,6 +676,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
if (chand->lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
}
+ grpc_connectivity_state_destroy(&chand->state_tracker);
+ grpc_pollset_set_destroy(&chand->pollset_set);
gpr_mu_destroy(&chand->mu_config);
}
@@ -590,6 +690,7 @@ const grpc_channel_filter grpc_client_channel_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ cc_get_peer,
"client-channel",
};
@@ -598,10 +699,66 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
/* post construction initialization: set the transport setup pointer */
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
+ gpr_mu_lock(&chand->mu_config);
GPR_ASSERT(!chand->resolver);
chand->resolver = resolver;
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
GRPC_RESOLVER_REF(resolver, "channel");
- grpc_resolver_next(resolver, &chand->incoming_configuration,
- &chand->on_config_changed);
+ if (chand->waiting_for_config_closures != NULL ||
+ chand->exit_idle_when_lb_policy_arrives) {
+ chand->started_resolving = 1;
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ grpc_resolver_next(resolver, &chand->incoming_configuration,
+ &chand->on_config_changed);
+ }
+ gpr_mu_unlock(&chand->mu_config);
+}
+
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+ grpc_channel_element *elem, int try_to_connect) {
+ channel_data *chand = elem->channel_data;
+ grpc_connectivity_state out;
+ gpr_mu_lock(&chand->mu_config);
+ out = grpc_connectivity_state_check(&chand->state_tracker);
+ if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy_exit_idle(chand->lb_policy);
+ } else {
+ chand->exit_idle_when_lb_policy_arrives = 1;
+ if (!chand->started_resolving && chand->resolver != NULL) {
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ chand->started_resolving = 1;
+ grpc_resolver_next(chand->resolver, &chand->incoming_configuration,
+ &chand->on_config_changed);
+ }
+ }
+ }
+ gpr_mu_unlock(&chand->mu_config);
+ return out;
+}
+
+void grpc_client_channel_watch_connectivity_state(
+ grpc_channel_element *elem, grpc_connectivity_state *state,
+ grpc_iomgr_closure *on_complete) {
+ channel_data *chand = elem->channel_data;
+ gpr_mu_lock(&chand->mu_config);
+ grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
+ on_complete);
+ gpr_mu_unlock(&chand->mu_config);
+}
+
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ return &chand->pollset_set;
+}
+
+void grpc_client_channel_add_interested_party(grpc_channel_element *elem,
+ grpc_pollset *pollset) {
+ channel_data *chand = elem->channel_data;
+ grpc_pollset_set_add_pollset(&chand->pollset_set, pollset);
+}
+
+void grpc_client_channel_del_interested_party(grpc_channel_element *elem,
+ grpc_pollset *pollset) {
+ channel_data *chand = elem->channel_data;
+ grpc_pollset_set_del_pollset(&chand->pollset_set, pollset);
}
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index fd2be46145..cd81294eb3 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -52,4 +52,18 @@ extern const grpc_channel_filter grpc_client_channel_filter;
void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver *resolver);
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+ grpc_channel_element *elem, int try_to_connect);
+
+void grpc_client_channel_watch_connectivity_state(
+ grpc_channel_element *elem, grpc_connectivity_state *state,
+ grpc_iomgr_closure *on_complete);
+
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem);
+
+void grpc_client_channel_add_interested_party(grpc_channel_element *channel,
+ grpc_pollset *pollset);
+void grpc_client_channel_del_interested_party(grpc_channel_element *channel,
+ grpc_pollset *pollset);
+
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 14cb3da62d..8963c13b0f 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -174,6 +174,8 @@ static void process_send_ops(grpc_call_element *elem,
size_t i;
int did_compress = 0;
+ /* In streaming calls, we need to reset the previously accumulated slices */
+ gpr_slice_buffer_reset_and_unref(&calld->slices);
for (i = 0; i < send_ops->nops; ++i) {
grpc_stream_op *sop = &send_ops->ops[i];
switch (sop->type) {
@@ -200,9 +202,9 @@ static void process_send_ops(grpc_call_element *elem,
channeld->default_compression_algorithm;
calld->has_compression_algorithm = 1; /* GPR_TRUE */
}
- grpc_metadata_batch_add_head(
+ grpc_metadata_batch_add_tail(
&(sop->data.metadata), &calld->compression_algorithm_storage,
- grpc_mdelem_ref(channeld->mdelem_compression_algorithms
+ GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms
[calld->compression_algorithm]));
calld->written_initial_metadata = 1; /* GPR_TRUE */
}
@@ -282,19 +284,19 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
grpc_channel_args_get_compression_algorithm(args);
channeld->mdstr_request_compression_algorithm_key =
- grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY);
+ grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0);
channeld->mdstr_outgoing_compression_algorithm_key =
- grpc_mdstr_from_string(mdctx, "grpc-encoding");
+ grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
- char *algorith_name;
- GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorith_name) != 0);
+ char *algorithm_name;
+ GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0);
channeld->mdelem_compression_algorithms[algo_idx] =
grpc_mdelem_from_metadata_strings(
mdctx,
- grpc_mdstr_ref(channeld->mdstr_outgoing_compression_algorithm_key),
- grpc_mdstr_from_string(mdctx, algorith_name));
+ GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
+ grpc_mdstr_from_string(mdctx, algorithm_name, 0));
}
GPR_ASSERT(!is_last);
@@ -305,11 +307,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
- grpc_mdstr_unref(channeld->mdstr_request_compression_algorithm_key);
- grpc_mdstr_unref(channeld->mdstr_outgoing_compression_algorithm_key);
+ GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key);
+ GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key);
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
++algo_idx) {
- grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]);
+ GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]);
}
}
@@ -322,4 +324,5 @@ const grpc_channel_filter grpc_compress_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
"compress"};
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 34d07de519..b95ed06f2b 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -119,6 +119,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
grpc_transport_destroy(cd->transport);
}
+static char *con_get_peer(grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ return grpc_transport_get_peer(chand->transport);
+}
+
const grpc_channel_filter grpc_connected_channel_filter = {
con_start_transport_stream_op,
con_start_transport_op,
@@ -128,6 +133,7 @@ const grpc_channel_filter grpc_connected_channel_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ con_get_peer,
"connected",
};
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index bc821b16fa..48c623d359 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -40,10 +40,12 @@
typedef struct call_data {
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;
+ grpc_linked_mdelem authority;
grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type;
grpc_linked_mdelem user_agent;
int sent_initial_metadata;
+ int sent_authority;
int got_initial_metadata;
grpc_stream_op_buffer *recv_ops;
@@ -62,6 +64,7 @@ typedef struct channel_data {
grpc_mdelem *scheme;
grpc_mdelem *content_type;
grpc_mdelem *status;
+ grpc_mdelem *default_authority;
/** complete user agent mdelem */
grpc_mdelem *user_agent;
} channel_data;
@@ -98,6 +101,23 @@ static void hc_on_recv(void *user_data, int success) {
calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
}
+static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ /* eat the things we'd like to set ourselves */
+ if (md->key == channeld->method->key) return NULL;
+ if (md->key == channeld->scheme->key) return NULL;
+ if (md->key == channeld->te_trailers->key) return NULL;
+ if (md->key == channeld->content_type->key) return NULL;
+ if (md->key == channeld->user_agent->key) return NULL;
+ if (channeld->default_authority &&
+ channeld->default_authority->key == md->key) {
+ calld->sent_authority = 1;
+ }
+ return md;
+}
+
static void hc_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
@@ -111,12 +131,18 @@ static void hc_mutate_op(grpc_call_element *elem,
grpc_stream_op *op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue;
calld->sent_initial_metadata = 1;
+ grpc_metadata_batch_filter(&op->data.metadata, client_strip_filter, elem);
/* Send : prefixed headers, which have to be before any application
layer headers. */
grpc_metadata_batch_add_head(&op->data.metadata, &calld->method,
GRPC_MDELEM_REF(channeld->method));
grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme,
GRPC_MDELEM_REF(channeld->scheme));
+ if (channeld->default_authority && !calld->sent_authority) {
+ grpc_metadata_batch_add_head(
+ &op->data.metadata, &calld->authority,
+ GRPC_MDELEM_REF(channeld->default_authority));
+ }
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers,
GRPC_MDELEM_REF(channeld->te_trailers));
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
@@ -149,6 +175,7 @@ static void init_call_elem(grpc_call_element *elem,
call_data *calld = elem->call_data;
calld->sent_initial_metadata = 0;
calld->got_initial_metadata = 0;
+ calld->sent_authority = 0;
calld->on_done_recv = NULL;
grpc_iomgr_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
if (initial_op) hc_mutate_op(elem, initial_op);
@@ -220,7 +247,7 @@ static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx,
tmp = gpr_strvec_flatten(&v, NULL);
gpr_strvec_destroy(&v);
- result = grpc_mdstr_from_string(mdctx, tmp);
+ result = grpc_mdstr_from_string(mdctx, tmp, 0);
gpr_free(tmp);
return result;
@@ -228,8 +255,10 @@ static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx,
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
+ const grpc_channel_args *channel_args,
+ grpc_mdctx *mdctx, int is_first, int is_last) {
+ size_t i;
+
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
@@ -238,17 +267,32 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
path */
GPR_ASSERT(!is_last);
+ channeld->default_authority = NULL;
+ if (channel_args) {
+ for (i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) {
+ if (channel_args->args[i].type != GRPC_ARG_STRING) {
+ gpr_log(GPR_ERROR, "%s: must be an string",
+ GRPC_ARG_DEFAULT_AUTHORITY);
+ } else {
+ channeld->default_authority = grpc_mdelem_from_strings(
+ mdctx, ":authority", channel_args->args[i].value.string);
+ }
+ }
+ }
+ }
+
/* initialize members */
channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers");
channeld->method = grpc_mdelem_from_strings(mdctx, ":method", "POST");
- channeld->scheme =
- grpc_mdelem_from_strings(mdctx, ":scheme", scheme_from_args(args));
+ channeld->scheme = grpc_mdelem_from_strings(mdctx, ":scheme",
+ scheme_from_args(channel_args));
channeld->content_type =
grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200");
channeld->user_agent = grpc_mdelem_from_metadata_strings(
- mdctx, grpc_mdstr_from_string(mdctx, "user-agent"),
- user_agent_from_args(mdctx, args));
+ mdctx, grpc_mdstr_from_string(mdctx, "user-agent", 0),
+ user_agent_from_args(mdctx, channel_args));
}
/* Destructor for channel data */
@@ -262,9 +306,13 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
GRPC_MDELEM_UNREF(channeld->content_type);
GRPC_MDELEM_UNREF(channeld->status);
GRPC_MDELEM_UNREF(channeld->user_agent);
+ if (channeld->default_authority) {
+ GRPC_MDELEM_UNREF(channeld->default_authority);
+ }
}
const grpc_channel_filter grpc_http_client_filter = {
hc_start_transport_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "http-client"};
+ init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
+ "http-client"};
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index a6cbb5a7f4..0955ae319a 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -44,6 +44,7 @@ typedef struct call_data {
gpr_uint8 sent_status;
gpr_uint8 seen_scheme;
gpr_uint8 seen_te_trailers;
+ gpr_uint8 seen_authority;
grpc_linked_mdelem status;
grpc_stream_op_buffer *recv_ops;
@@ -125,6 +126,9 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
}
calld->seen_path = 1;
return md;
+ } else if (md->key == channeld->authority_key) {
+ calld->seen_authority = 1;
+ return md;
} else if (md->key == channeld->host_key) {
/* translate host to :authority since :authority may be
omitted */
@@ -132,6 +136,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
channeld->mdctx, GRPC_MDSTR_REF(channeld->authority_key),
GRPC_MDSTR_REF(md->value));
GRPC_MDELEM_UNREF(md);
+ calld->seen_authority = 1;
return authority;
} else {
return md;
@@ -154,12 +159,15 @@ static void hs_on_recv(void *user_data, int success) {
(:method, :scheme, content-type, with :path and :authority covered
at the channel level right now) */
if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
- calld->seen_path) {
+ calld->seen_path && calld->seen_authority) {
/* do nothing */
} else {
if (!calld->seen_path) {
gpr_log(GPR_ERROR, "Missing :path header");
}
+ if (!calld->seen_authority) {
+ gpr_log(GPR_ERROR, "Missing :authority header");
+ }
if (!calld->seen_post) {
gpr_log(GPR_ERROR, "Missing :method header");
}
@@ -250,9 +258,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http");
channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https");
channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc");
- channeld->path_key = grpc_mdstr_from_string(mdctx, ":path");
- channeld->authority_key = grpc_mdstr_from_string(mdctx, ":authority");
- channeld->host_key = grpc_mdstr_from_string(mdctx, "host");
+ channeld->path_key = grpc_mdstr_from_string(mdctx, ":path", 0);
+ channeld->authority_key = grpc_mdstr_from_string(mdctx, ":authority", 0);
+ channeld->host_key = grpc_mdstr_from_string(mdctx, "host", 0);
channeld->content_type =
grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
@@ -280,4 +288,5 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
const grpc_channel_filter grpc_http_server_filter = {
hs_start_transport_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "http-server"};
+ init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
+ "http-server"};
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index 5117723617..d631885aaf 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -127,4 +127,5 @@ const grpc_channel_filter grpc_no_op_filter = {noop_start_transport_stream_op,
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
"no-op"};