diff options
Diffstat (limited to 'src/core/channel')
-rw-r--r-- | src/core/channel/channel_stack.c | 5 | ||||
-rw-r--r-- | src/core/channel/channel_stack.h | 5 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 199 | ||||
-rw-r--r-- | src/core/channel/client_channel.h | 14 | ||||
-rw-r--r-- | src/core/channel/compress_filter.c | 25 | ||||
-rw-r--r-- | src/core/channel/connected_channel.c | 6 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 64 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 19 | ||||
-rw-r--r-- | src/core/channel/noop_filter.c | 1 |
9 files changed, 293 insertions, 45 deletions
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index e38dcb58b7..cd7c182ef2 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -191,6 +191,11 @@ void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op) { next_elem->filter->start_transport_stream_op(next_elem, op); } +char *grpc_call_next_get_peer(grpc_call_element *elem) { + grpc_call_element *next_elem = elem + 1; + return next_elem->filter->get_peer(next_elem); +} + void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op) { grpc_channel_element *next_elem = elem + 1; next_elem->filter->start_transport_op(next_elem, op); diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 785be8925b..4a608b956e 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -104,6 +104,9 @@ typedef struct { The filter does not need to do any chaining */ void (*destroy_channel_elem)(grpc_channel_element *elem); + /* Implement grpc_call_get_peer() */ + char *(*get_peer)(grpc_call_element *elem); + /* The name of this filter */ const char *name; } grpc_channel_filter; @@ -173,6 +176,8 @@ void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op); /* Call the next operation (depending on call directionality) in a channel stack */ void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op); +/* Pass through a request to get_peer to the next child element */ +char *grpc_call_next_get_peer(grpc_call_element *elem); /* Given the top element of a channel stack, get the channel stack itself */ grpc_channel_stack *grpc_channel_stack_from_top_element( diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index c1aa580b2d..a293c93ec6 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -40,7 +40,6 @@ #include "src/core/channel/connected_channel.h" #include "src/core/surface/channel.h" #include "src/core/iomgr/iomgr.h" -#include "src/core/iomgr/pollset_set.h" #include "src/core/support/string.h" #include "src/core/transport/connectivity_state.h" #include <grpc/support/alloc.h> @@ -57,6 +56,8 @@ typedef struct { grpc_mdctx *mdctx; /** resolver for this channel */ grpc_resolver *resolver; + /** have we started resolving this channel */ + int started_resolving; /** master channel - the grpc_channel instance that ultimately owns this channel_data via its channel stack. We occasionally use this to bump the refcount on the master channel @@ -77,8 +78,22 @@ typedef struct { grpc_iomgr_closure on_config_changed; /** connectivity state being tracked */ grpc_connectivity_state_tracker state_tracker; + /** when an lb_policy arrives, should we try to exit idle */ + int exit_idle_when_lb_policy_arrives; + /** pollset_set of interested parties in a new connection */ + grpc_pollset_set pollset_set; } channel_data; +/** We create one watcher for each new lb_policy that is returned from a resolver, + to watch for state changes from the lb_policy. When a state change is seen, we + update the channel, and create a new watcher */ +typedef struct { + channel_data *chand; + grpc_iomgr_closure on_changed; + grpc_connectivity_state state; + grpc_lb_policy *lb_policy; +} lb_policy_connectivity_watcher; + typedef enum { CALL_CREATED, CALL_WAITING_FOR_SEND, @@ -265,6 +280,26 @@ static grpc_iomgr_closure *merge_into_waiting_op( return consumed_op; } +static char *cc_get_peer(grpc_call_element *elem) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_subchannel_call *subchannel_call; + char *result; + + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_ACTIVE) { + subchannel_call = calld->subchannel_call; + GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer"); + gpr_mu_unlock(&calld->mu_state); + result = grpc_subchannel_call_get_peer(subchannel_call); + GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "get_peer"); + return result; + } else { + gpr_mu_unlock(&calld->mu_state); + return grpc_channel_get_target(chand->master); + } +} + static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) { @@ -365,6 +400,13 @@ static void perform_transport_stream_op(grpc_call_element *elem, } else if (chand->resolver != NULL) { calld->state = CALL_WAITING_FOR_CONFIG; add_to_lb_policy_wait_queue_locked_state_config(elem); + if (!chand->started_resolving && chand->resolver != NULL) { + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + chand->started_resolving = 1; + grpc_resolver_next(chand->resolver, + &chand->incoming_configuration, + &chand->on_config_changed); + } gpr_mu_unlock(&chand->mu_config); gpr_mu_unlock(&calld->mu_state); } else { @@ -388,16 +430,53 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, perform_transport_stream_op(elem, op, 0); } +static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state); + +static void on_lb_policy_state_changed(void *arg, int iomgr_success) { + lb_policy_connectivity_watcher *w = arg; + + gpr_mu_lock(&w->chand->mu_config); + /* check if the notification is for a stale policy */ + if (w->lb_policy == w->chand->lb_policy) { + grpc_connectivity_state_set(&w->chand->state_tracker, w->state, + "lb_changed"); + if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { + watch_lb_policy(w->chand, w->lb_policy, w->state); + } + } + gpr_mu_unlock(&w->chand->mu_config); + + GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy"); + gpr_free(w); +} + +static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) { + lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); + + w->chand = chand; + grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w); + w->state = current_state; + w->lb_policy = lb_policy; + grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed); +} + static void cc_on_config_changed(void *arg, int iomgr_success) { channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; grpc_resolver *old_resolver; grpc_iomgr_closure *wakeup_closures = NULL; + grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; + int exit_idle = 0; if (chand->incoming_configuration != NULL) { lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); - GRPC_LB_POLICY_REF(lb_policy, "channel"); + if (lb_policy != NULL) { + GRPC_LB_POLICY_REF(lb_policy, "channel"); + GRPC_LB_POLICY_REF(lb_policy, "config_change"); + state = grpc_lb_policy_check_connectivity(lb_policy); + } grpc_client_config_unref(chand->incoming_configuration); } @@ -411,13 +490,12 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { wakeup_closures = chand->waiting_for_config_closures; chand->waiting_for_config_closures = NULL; } - gpr_mu_unlock(&chand->mu_config); - - if (old_lb_policy) { - GRPC_LB_POLICY_UNREF(old_lb_policy, "channel"); + if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { + GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); + exit_idle = 1; + chand->exit_idle_when_lb_policy_arrives = 0; } - gpr_mu_lock(&chand->mu_config); if (iomgr_success && chand->resolver) { grpc_resolver *resolver = chand->resolver; GRPC_RESOLVER_REF(resolver, "channel-next"); @@ -426,11 +504,16 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed); GRPC_RESOLVER_UNREF(resolver, "channel-next"); + grpc_connectivity_state_set(&chand->state_tracker, state, + "new_lb+resolver"); + if (lb_policy != NULL) { + watch_lb_policy(chand, lb_policy, state); + } } else { old_resolver = chand->resolver; chand->resolver = NULL; grpc_connectivity_state_set(&chand->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE); + GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); gpr_mu_unlock(&chand->mu_config); if (old_resolver != NULL) { grpc_resolver_shutdown(old_resolver); @@ -438,12 +521,24 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { } } + if (exit_idle) { + grpc_lb_policy_exit_idle(lb_policy); + GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle"); + } + + if (old_lb_policy != NULL) { + GRPC_LB_POLICY_UNREF(old_lb_policy, "channel"); + } + while (wakeup_closures) { grpc_iomgr_closure *next = wakeup_closures->next; - grpc_iomgr_add_callback(wakeup_closures); + wakeup_closures->cb(wakeup_closures->cb_arg, 1); wakeup_closures = next; } + if (lb_policy != NULL) { + GRPC_LB_POLICY_UNREF(lb_policy, "config_change"); + } GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver"); } @@ -467,20 +562,22 @@ static void cc_start_transport_op(grpc_channel_element *elem, op->connectivity_state = NULL; } + if (!is_empty(op, sizeof(*op))) { + lb_policy = chand->lb_policy; + if (lb_policy) { + GRPC_LB_POLICY_REF(lb_policy, "broadcast"); + } + } + if (op->disconnect && chand->resolver != NULL) { grpc_connectivity_state_set(&chand->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE); + GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { grpc_lb_policy_shutdown(chand->lb_policy); - } - } - - if (!is_empty(op, sizeof(*op))) { - lb_policy = chand->lb_policy; - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "broadcast"); + GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); + chand->lb_policy = NULL; } } gpr_mu_unlock(&chand->mu_config); @@ -561,10 +658,11 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, gpr_mu_init(&chand->mu_config); chand->mdctx = metadata_context; chand->master = master; + grpc_pollset_set_init(&chand->pollset_set); grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); - grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE); + grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); } /* Destructor for channel_data */ @@ -578,6 +676,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { if (chand->lb_policy != NULL) { GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); } + grpc_connectivity_state_destroy(&chand->state_tracker); + grpc_pollset_set_destroy(&chand->pollset_set); gpr_mu_destroy(&chand->mu_config); } @@ -590,6 +690,7 @@ const grpc_channel_filter grpc_client_channel_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + cc_get_peer, "client-channel", }; @@ -598,10 +699,66 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, /* post construction initialization: set the transport setup pointer */ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *chand = elem->channel_data; + gpr_mu_lock(&chand->mu_config); GPR_ASSERT(!chand->resolver); chand->resolver = resolver; - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); GRPC_RESOLVER_REF(resolver, "channel"); - grpc_resolver_next(resolver, &chand->incoming_configuration, - &chand->on_config_changed); + if (chand->waiting_for_config_closures != NULL || + chand->exit_idle_when_lb_policy_arrives) { + chand->started_resolving = 1; + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + grpc_resolver_next(resolver, &chand->incoming_configuration, + &chand->on_config_changed); + } + gpr_mu_unlock(&chand->mu_config); +} + +grpc_connectivity_state grpc_client_channel_check_connectivity_state( + grpc_channel_element *elem, int try_to_connect) { + channel_data *chand = elem->channel_data; + grpc_connectivity_state out; + gpr_mu_lock(&chand->mu_config); + out = grpc_connectivity_state_check(&chand->state_tracker); + if (out == GRPC_CHANNEL_IDLE && try_to_connect) { + if (chand->lb_policy != NULL) { + grpc_lb_policy_exit_idle(chand->lb_policy); + } else { + chand->exit_idle_when_lb_policy_arrives = 1; + if (!chand->started_resolving && chand->resolver != NULL) { + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + chand->started_resolving = 1; + grpc_resolver_next(chand->resolver, &chand->incoming_configuration, + &chand->on_config_changed); + } + } + } + gpr_mu_unlock(&chand->mu_config); + return out; +} + +void grpc_client_channel_watch_connectivity_state( + grpc_channel_element *elem, grpc_connectivity_state *state, + grpc_iomgr_closure *on_complete) { + channel_data *chand = elem->channel_data; + gpr_mu_lock(&chand->mu_config); + grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state, + on_complete); + gpr_mu_unlock(&chand->mu_config); +} + +grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + return &chand->pollset_set; +} + +void grpc_client_channel_add_interested_party(grpc_channel_element *elem, + grpc_pollset *pollset) { + channel_data *chand = elem->channel_data; + grpc_pollset_set_add_pollset(&chand->pollset_set, pollset); +} + +void grpc_client_channel_del_interested_party(grpc_channel_element *elem, + grpc_pollset *pollset) { + channel_data *chand = elem->channel_data; + grpc_pollset_set_del_pollset(&chand->pollset_set, pollset); } diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index fd2be46145..cd81294eb3 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -52,4 +52,18 @@ extern const grpc_channel_filter grpc_client_channel_filter; void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver *resolver); +grpc_connectivity_state grpc_client_channel_check_connectivity_state( + grpc_channel_element *elem, int try_to_connect); + +void grpc_client_channel_watch_connectivity_state( + grpc_channel_element *elem, grpc_connectivity_state *state, + grpc_iomgr_closure *on_complete); + +grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem); + +void grpc_client_channel_add_interested_party(grpc_channel_element *channel, + grpc_pollset *pollset); +void grpc_client_channel_del_interested_party(grpc_channel_element *channel, + grpc_pollset *pollset); + #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 14cb3da62d..8963c13b0f 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -174,6 +174,8 @@ static void process_send_ops(grpc_call_element *elem, size_t i; int did_compress = 0; + /* In streaming calls, we need to reset the previously accumulated slices */ + gpr_slice_buffer_reset_and_unref(&calld->slices); for (i = 0; i < send_ops->nops; ++i) { grpc_stream_op *sop = &send_ops->ops[i]; switch (sop->type) { @@ -200,9 +202,9 @@ static void process_send_ops(grpc_call_element *elem, channeld->default_compression_algorithm; calld->has_compression_algorithm = 1; /* GPR_TRUE */ } - grpc_metadata_batch_add_head( + grpc_metadata_batch_add_tail( &(sop->data.metadata), &calld->compression_algorithm_storage, - grpc_mdelem_ref(channeld->mdelem_compression_algorithms + GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms [calld->compression_algorithm])); calld->written_initial_metadata = 1; /* GPR_TRUE */ } @@ -282,19 +284,19 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, grpc_channel_args_get_compression_algorithm(args); channeld->mdstr_request_compression_algorithm_key = - grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY); + grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0); channeld->mdstr_outgoing_compression_algorithm_key = - grpc_mdstr_from_string(mdctx, "grpc-encoding"); + grpc_mdstr_from_string(mdctx, "grpc-encoding", 0); for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { - char *algorith_name; - GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorith_name) != 0); + char *algorithm_name; + GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0); channeld->mdelem_compression_algorithms[algo_idx] = grpc_mdelem_from_metadata_strings( mdctx, - grpc_mdstr_ref(channeld->mdstr_outgoing_compression_algorithm_key), - grpc_mdstr_from_string(mdctx, algorith_name)); + GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), + grpc_mdstr_from_string(mdctx, algorithm_name, 0)); } GPR_ASSERT(!is_last); @@ -305,11 +307,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; - grpc_mdstr_unref(channeld->mdstr_request_compression_algorithm_key); - grpc_mdstr_unref(channeld->mdstr_outgoing_compression_algorithm_key); + GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key); + GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key); for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { - grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]); + GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]); } } @@ -322,4 +324,5 @@ const grpc_channel_filter grpc_compress_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "compress"}; diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 34d07de519..b95ed06f2b 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -119,6 +119,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) { grpc_transport_destroy(cd->transport); } +static char *con_get_peer(grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + return grpc_transport_get_peer(chand->transport); +} + const grpc_channel_filter grpc_connected_channel_filter = { con_start_transport_stream_op, con_start_transport_op, @@ -128,6 +133,7 @@ const grpc_channel_filter grpc_connected_channel_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + con_get_peer, "connected", }; diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index bc821b16fa..48c623d359 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -40,10 +40,12 @@ typedef struct call_data { grpc_linked_mdelem method; grpc_linked_mdelem scheme; + grpc_linked_mdelem authority; grpc_linked_mdelem te_trailers; grpc_linked_mdelem content_type; grpc_linked_mdelem user_agent; int sent_initial_metadata; + int sent_authority; int got_initial_metadata; grpc_stream_op_buffer *recv_ops; @@ -62,6 +64,7 @@ typedef struct channel_data { grpc_mdelem *scheme; grpc_mdelem *content_type; grpc_mdelem *status; + grpc_mdelem *default_authority; /** complete user agent mdelem */ grpc_mdelem *user_agent; } channel_data; @@ -98,6 +101,23 @@ static void hc_on_recv(void *user_data, int success) { calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } +static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + /* eat the things we'd like to set ourselves */ + if (md->key == channeld->method->key) return NULL; + if (md->key == channeld->scheme->key) return NULL; + if (md->key == channeld->te_trailers->key) return NULL; + if (md->key == channeld->content_type->key) return NULL; + if (md->key == channeld->user_agent->key) return NULL; + if (channeld->default_authority && + channeld->default_authority->key == md->key) { + calld->sent_authority = 1; + } + return md; +} + static void hc_mutate_op(grpc_call_element *elem, grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ @@ -111,12 +131,18 @@ static void hc_mutate_op(grpc_call_element *elem, grpc_stream_op *op = &ops[i]; if (op->type != GRPC_OP_METADATA) continue; calld->sent_initial_metadata = 1; + grpc_metadata_batch_filter(&op->data.metadata, client_strip_filter, elem); /* Send : prefixed headers, which have to be before any application layer headers. */ grpc_metadata_batch_add_head(&op->data.metadata, &calld->method, GRPC_MDELEM_REF(channeld->method)); grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme, GRPC_MDELEM_REF(channeld->scheme)); + if (channeld->default_authority && !calld->sent_authority) { + grpc_metadata_batch_add_head( + &op->data.metadata, &calld->authority, + GRPC_MDELEM_REF(channeld->default_authority)); + } grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers, GRPC_MDELEM_REF(channeld->te_trailers)); grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, @@ -149,6 +175,7 @@ static void init_call_elem(grpc_call_element *elem, call_data *calld = elem->call_data; calld->sent_initial_metadata = 0; calld->got_initial_metadata = 0; + calld->sent_authority = 0; calld->on_done_recv = NULL; grpc_iomgr_closure_init(&calld->hc_on_recv, hc_on_recv, elem); if (initial_op) hc_mutate_op(elem, initial_op); @@ -220,7 +247,7 @@ static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx, tmp = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); - result = grpc_mdstr_from_string(mdctx, tmp); + result = grpc_mdstr_from_string(mdctx, tmp, 0); gpr_free(tmp); return result; @@ -228,8 +255,10 @@ static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx, /* Constructor for channel_data */ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { + const grpc_channel_args *channel_args, + grpc_mdctx *mdctx, int is_first, int is_last) { + size_t i; + /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; @@ -238,17 +267,32 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, path */ GPR_ASSERT(!is_last); + channeld->default_authority = NULL; + if (channel_args) { + for (i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) { + if (channel_args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "%s: must be an string", + GRPC_ARG_DEFAULT_AUTHORITY); + } else { + channeld->default_authority = grpc_mdelem_from_strings( + mdctx, ":authority", channel_args->args[i].value.string); + } + } + } + } + /* initialize members */ channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers"); channeld->method = grpc_mdelem_from_strings(mdctx, ":method", "POST"); - channeld->scheme = - grpc_mdelem_from_strings(mdctx, ":scheme", scheme_from_args(args)); + channeld->scheme = grpc_mdelem_from_strings(mdctx, ":scheme", + scheme_from_args(channel_args)); channeld->content_type = grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200"); channeld->user_agent = grpc_mdelem_from_metadata_strings( - mdctx, grpc_mdstr_from_string(mdctx, "user-agent"), - user_agent_from_args(mdctx, args)); + mdctx, grpc_mdstr_from_string(mdctx, "user-agent", 0), + user_agent_from_args(mdctx, channel_args)); } /* Destructor for channel data */ @@ -262,9 +306,13 @@ static void destroy_channel_elem(grpc_channel_element *elem) { GRPC_MDELEM_UNREF(channeld->content_type); GRPC_MDELEM_UNREF(channeld->status); GRPC_MDELEM_UNREF(channeld->user_agent); + if (channeld->default_authority) { + GRPC_MDELEM_UNREF(channeld->default_authority); + } } const grpc_channel_filter grpc_http_client_filter = { hc_start_transport_op, grpc_channel_next_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "http-client"}; + init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + "http-client"}; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index a6cbb5a7f4..0955ae319a 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -44,6 +44,7 @@ typedef struct call_data { gpr_uint8 sent_status; gpr_uint8 seen_scheme; gpr_uint8 seen_te_trailers; + gpr_uint8 seen_authority; grpc_linked_mdelem status; grpc_stream_op_buffer *recv_ops; @@ -125,6 +126,9 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { } calld->seen_path = 1; return md; + } else if (md->key == channeld->authority_key) { + calld->seen_authority = 1; + return md; } else if (md->key == channeld->host_key) { /* translate host to :authority since :authority may be omitted */ @@ -132,6 +136,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { channeld->mdctx, GRPC_MDSTR_REF(channeld->authority_key), GRPC_MDSTR_REF(md->value)); GRPC_MDELEM_UNREF(md); + calld->seen_authority = 1; return authority; } else { return md; @@ -154,12 +159,15 @@ static void hs_on_recv(void *user_data, int success) { (:method, :scheme, content-type, with :path and :authority covered at the channel level right now) */ if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && - calld->seen_path) { + calld->seen_path && calld->seen_authority) { /* do nothing */ } else { if (!calld->seen_path) { gpr_log(GPR_ERROR, "Missing :path header"); } + if (!calld->seen_authority) { + gpr_log(GPR_ERROR, "Missing :authority header"); + } if (!calld->seen_post) { gpr_log(GPR_ERROR, "Missing :method header"); } @@ -250,9 +258,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http"); channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https"); channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc"); - channeld->path_key = grpc_mdstr_from_string(mdctx, ":path"); - channeld->authority_key = grpc_mdstr_from_string(mdctx, ":authority"); - channeld->host_key = grpc_mdstr_from_string(mdctx, "host"); + channeld->path_key = grpc_mdstr_from_string(mdctx, ":path", 0); + channeld->authority_key = grpc_mdstr_from_string(mdctx, ":authority", 0); + channeld->host_key = grpc_mdstr_from_string(mdctx, "host", 0); channeld->content_type = grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); @@ -280,4 +288,5 @@ static void destroy_channel_elem(grpc_channel_element *elem) { const grpc_channel_filter grpc_http_server_filter = { hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "http-server"}; + init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + "http-server"}; diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index 5117723617..d631885aaf 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -127,4 +127,5 @@ const grpc_channel_filter grpc_no_op_filter = {noop_start_transport_stream_op, sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "no-op"}; |