diff options
Diffstat (limited to 'src/core/channel')
-rw-r--r-- | src/core/channel/client_channel.c | 3 | ||||
-rw-r--r-- | src/core/channel/client_uchannel.c | 6 | ||||
-rw-r--r-- | src/core/channel/client_uchannel.h | 3 | ||||
-rw-r--r-- | src/core/channel/compress_filter.c | 62 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 150 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 104 | ||||
-rw-r--r-- | src/core/channel/subchannel_call_holder.c | 15 | ||||
-rw-r--r-- | src/core/channel/subchannel_call_holder.h | 4 |
8 files changed, 89 insertions, 258 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 020138bf15..1abcd3b9cc 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -364,7 +364,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, /* Constructor for call_data */ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { - grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem); + grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem, + args->metadata_context); } /* Destructor for call_data */ diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index cf5e3bf482..3276635625 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -140,7 +140,7 @@ static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel, - elem->channel_data); + elem->channel_data, args->metadata_context); } /* Destructor for call_data */ @@ -244,11 +244,11 @@ void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, } grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, - grpc_channel_args *args) { + grpc_channel_args *args, + grpc_mdctx *mdctx) { grpc_channel *channel = NULL; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; - grpc_mdctx *mdctx = grpc_subchannel_get_mdctx(subchannel); grpc_channel *master = grpc_subchannel_get_master(subchannel); char *target = grpc_channel_get_target(master); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h index dfe6695ae3..54fbea964c 100644 --- a/src/core/channel/client_uchannel.h +++ b/src/core/channel/client_uchannel.h @@ -62,7 +62,8 @@ void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, - grpc_channel_args *args); + grpc_channel_args *args, + grpc_mdctx *mdctx); void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel, grpc_subchannel *subchannel); diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 62219f5aa7..25d6e51281 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -42,6 +42,7 @@ #include "src/core/channel/compress_filter.h" #include "src/core/channel/channel_args.h" #include "src/core/profiling/timers.h" +#include "src/core/compression/algorithm_metadata.h" #include "src/core/compression/message_compress.h" #include "src/core/support/string.h" #include "src/core/transport/static_metadata.h" @@ -65,6 +66,8 @@ typedef struct call_data { grpc_closure *post_send; grpc_closure send_done; grpc_closure got_slice; + + grpc_mdctx *mdctx; } call_data; typedef struct channel_data { @@ -72,6 +75,8 @@ typedef struct channel_data { grpc_compression_algorithm default_compression_algorithm; /** Compression options for the channel */ grpc_compression_options compression_options; + /** Supported compression algorithms */ + gpr_uint32 supported_compression_algorithms; } channel_data; /** For each \a md element from the incoming metadata, filter out the entry for @@ -82,7 +87,7 @@ static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; - if (md->key == GRPC_MDSTR_GRPC_ENCODING) { + if (md->key == GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), &calld->compression_algorithm)) { @@ -138,14 +143,13 @@ static void process_send_initial_metadata( /* hint compression algorithm */ grpc_metadata_batch_add_tail( initial_metadata, &calld->compression_algorithm_storage, - GRPC_MDELEM_REF( - channeld - ->mdelem_compression_algorithms[calld->compression_algorithm])); + grpc_compression_encoding_mdelem(calld->compression_algorithm)); /* convey supported compression algorithms */ grpc_metadata_batch_add_tail( initial_metadata, &calld->accept_encoding_storage, - GRPC_MDELEM_REF(channeld->mdelem_accept_encoding)); + GRPC_MDELEM_REF(grpc_accept_encoding_mdelem_from_compression_algorithms( + calld->mdctx, channeld->supported_compression_algorithms))); } static void continue_send_message(grpc_exec_ctx *exec_ctx, @@ -239,6 +243,7 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, /* initialize members */ gpr_slice_buffer_init(&calld->slices); calld->has_compression_algorithm = 0; + calld->mdctx = args->metadata_context; grpc_closure_init(&calld->got_slice, got_slice, elem); grpc_closure_init(&calld->send_done, send_done, elem); } @@ -257,10 +262,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element_args *args) { channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; - const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1]; - size_t supported_algorithms_idx = 0; - char *accept_encoding_str; - size_t accept_encoding_str_len; grpc_compression_options_init(&channeld->compression_options); channeld->compression_options.enabled_algorithms_bitset = @@ -275,61 +276,22 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, channeld->compression_options.default_compression_algorithm = channeld->default_compression_algorithm; - channeld->mdstr_request_compression_algorithm_key = grpc_mdstr_from_string( - args->metadata_context, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY); - - channeld->mdstr_outgoing_compression_algorithm_key = - grpc_mdstr_from_string(args->metadata_context, "grpc-encoding"); - - channeld->mdstr_compression_capabilities_key = - grpc_mdstr_from_string(args->metadata_context, "grpc-accept-encoding"); - + channeld->supported_compression_algorithms = 0; for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { - char *algorithm_name; /* skip disabled algorithms */ if (grpc_compression_options_is_algorithm_enabled( &channeld->compression_options, algo_idx) == 0) { continue; } - GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0); - channeld->mdelem_compression_algorithms[algo_idx] = - grpc_mdelem_from_metadata_strings( - args->metadata_context, - GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), - grpc_mdstr_from_string(args->metadata_context, algorithm_name)); - if (algo_idx > 0) { - supported_algorithms_names[supported_algorithms_idx++] = algorithm_name; - } + channeld->supported_compression_algorithms |= 1u << algo_idx; } - /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated - * arrays, as to avoid the heap allocs */ - accept_encoding_str = - gpr_strjoin_sep(supported_algorithms_names, supported_algorithms_idx, ",", - &accept_encoding_str_len); - - channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings( - args->metadata_context, - GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), - grpc_mdstr_from_string(args->metadata_context, accept_encoding_str)); - gpr_free(accept_encoding_str); - GPR_ASSERT(!args->is_last); } /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { - channel_data *channeld = elem->channel_data; - grpc_compression_algorithm algo_idx; - - GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key); - GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key); - GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key); - for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { - GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]); - } - GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding); } const grpc_channel_filter grpc_compress_filter = { diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 3a0f68f30f..ec7791656f 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -37,6 +37,7 @@ #include <grpc/support/string_util.h> #include "src/core/support/string.h" #include "src/core/profiling/timers.h" +#include "src/core/transport/static_metadata.h" typedef struct call_data { grpc_linked_mdelem method; @@ -54,17 +55,11 @@ typedef struct call_data { up-call on transport_op, and remember to call our on_done_recv member after handling it. */ grpc_closure hc_on_recv; + + grpc_mdctx *mdctx; } call_data; -typedef struct channel_data { - grpc_mdelem *te_trailers; - grpc_mdelem *method; - grpc_mdelem *scheme; - grpc_mdelem *content_type; - grpc_mdelem *status; - /** complete user agent mdelem */ - grpc_mdelem *user_agent; -} channel_data; +typedef struct channel_data { grpc_mdelem *static_scheme; } channel_data; typedef struct { grpc_call_element *elem; @@ -73,14 +68,12 @@ typedef struct { static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { client_recv_filter_args *a = user_data; - grpc_call_element *elem = a->elem; - channel_data *channeld = elem->channel_data; - if (md == channeld->status) { + if (md == GRPC_MDELEM_STATUS_200) { return NULL; - } else if (md->key == channeld->status->key) { - grpc_call_element_send_cancel(a->exec_ctx, elem); + } else if (md->key == GRPC_MDSTR_STATUS) { + grpc_call_element_send_cancel(a->exec_ctx, a->elem); return NULL; - } else if (md->key == channeld->content_type->key) { + } else if (md->key == GRPC_MDSTR_CONTENT_TYPE) { return NULL; } return md; @@ -98,14 +91,12 @@ static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) { } static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { - grpc_call_element *elem = user_data; - channel_data *channeld = elem->channel_data; /* eat the things we'd like to set ourselves */ - if (md->key == channeld->method->key) return NULL; - if (md->key == channeld->scheme->key) return NULL; - if (md->key == channeld->te_trailers->key) return NULL; - if (md->key == channeld->content_type->key) return NULL; - if (md->key == channeld->user_agent->key) return NULL; + if (md->key == GRPC_MDSTR_METHOD) return NULL; + if (md->key == GRPC_MDSTR_SCHEME) return NULL; + if (md->key == GRPC_MDSTR_TE) return NULL; + if (md->key == GRPC_MDSTR_CONTENT_TYPE) return NULL; + if (md->key == GRPC_MDSTR_USER_AGENT) return NULL; return md; } @@ -120,16 +111,18 @@ static void hc_mutate_op(grpc_call_element *elem, /* Send : prefixed headers, which have to be before any application layer headers. */ grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method, - GRPC_MDELEM_REF(channeld->method)); + GRPC_MDELEM_METHOD_POST); grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme, - GRPC_MDELEM_REF(channeld->scheme)); + channeld->static_scheme); grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers, - GRPC_MDELEM_REF(channeld->te_trailers)); - grpc_metadata_batch_add_tail(op->send_initial_metadata, - &calld->content_type, - GRPC_MDELEM_REF(channeld->content_type)); - grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->user_agent, - GRPC_MDELEM_REF(channeld->user_agent)); + GRPC_MDELEM_TE_TRAILERS); + grpc_metadata_batch_add_tail( + op->send_initial_metadata, &calld->content_type, + GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); + grpc_metadata_batch_add_tail( + op->send_initial_metadata, &calld->user_agent, + GRPC_MDELEM_REF(grpc_mdelem_from_cache(calld->mdctx, + GRPC_MDELEM_CACHED_USER_AGENT))); } if (op->recv_initial_metadata != NULL) { @@ -155,6 +148,7 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { call_data *calld = elem->call_data; calld->on_done_recv = NULL; + calld->mdctx = args->metadata_context; grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem); } @@ -162,109 +156,39 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {} -static const char *scheme_from_args(const grpc_channel_args *args) { +static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) { unsigned i; + size_t j; + grpc_mdelem *valid_schemes[] = {GRPC_MDELEM_SCHEME_HTTP, + GRPC_MDELEM_SCHEME_HTTPS}; if (args != NULL) { for (i = 0; i < args->num_args; ++i) { if (args->args[i].type == GRPC_ARG_STRING && strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) { - return args->args[i].value.string; + for (j = 0; j < GPR_ARRAY_SIZE(valid_schemes); j++) { + if (0 == strcmp(grpc_mdstr_as_c_string(valid_schemes[j]->value), + args->args[i].value.string)) { + return valid_schemes[j]; + } + } } } } - return "http"; -} - -static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx, - const grpc_channel_args *args) { - gpr_strvec v; - size_t i; - int is_first = 1; - char *tmp; - grpc_mdstr *result; - - gpr_strvec_init(&v); - - for (i = 0; args && i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) { - if (args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", - GRPC_ARG_PRIMARY_USER_AGENT_STRING); - } else { - if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); - is_first = 0; - gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); - } - } - } - - gpr_asprintf(&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ", - grpc_version_string(), GPR_PLATFORM_STRING); - is_first = 0; - gpr_strvec_add(&v, tmp); - - for (i = 0; args && i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) { - if (args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", - GRPC_ARG_SECONDARY_USER_AGENT_STRING); - } else { - if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); - is_first = 0; - gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); - } - } - } - - tmp = gpr_strvec_flatten(&v, NULL); - gpr_strvec_destroy(&v); - result = grpc_mdstr_from_string(mdctx, tmp); - gpr_free(tmp); - - return result; + return GRPC_MDELEM_SCHEME_HTTP; } /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { - /* grab pointers to our data from the channel element */ - channel_data *channeld = elem->channel_data; - - /* The first and the last filters tend to be implemented differently to - handle the case that there's no 'next' filter to call on the up or down - path */ + channel_data *chand = elem->channel_data; GPR_ASSERT(!args->is_last); - - /* initialize members */ - channeld->te_trailers = - grpc_mdelem_from_strings(args->metadata_context, "te", "trailers"); - channeld->method = - grpc_mdelem_from_strings(args->metadata_context, ":method", "POST"); - channeld->scheme = grpc_mdelem_from_strings( - args->metadata_context, ":scheme", scheme_from_args(args->channel_args)); - channeld->content_type = grpc_mdelem_from_strings( - args->metadata_context, "content-type", "application/grpc"); - channeld->status = - grpc_mdelem_from_strings(args->metadata_context, ":status", "200"); - channeld->user_agent = grpc_mdelem_from_metadata_strings( - args->metadata_context, - grpc_mdstr_from_string(args->metadata_context, "user-agent"), - user_agent_from_args(args->metadata_context, args->channel_args)); + chand->static_scheme = scheme_from_args(args->channel_args); } /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { - /* grab pointers to our data from the channel element */ - channel_data *channeld = elem->channel_data; - - GRPC_MDELEM_UNREF(channeld->te_trailers); - GRPC_MDELEM_UNREF(channeld->method); - GRPC_MDELEM_UNREF(channeld->scheme); - GRPC_MDELEM_UNREF(channeld->content_type); - GRPC_MDELEM_UNREF(channeld->status); - GRPC_MDELEM_UNREF(channeld->user_agent); } const grpc_channel_filter grpc_http_client_filter = { diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 2adfe2bb61..c9db9470e0 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -37,6 +37,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include "src/core/profiling/timers.h" +#include "src/core/transport/static_metadata.h" typedef struct call_data { gpr_uint8 seen_path; @@ -55,24 +56,10 @@ typedef struct call_data { up-call on transport_op, and remember to call our on_done_recv member after handling it. */ grpc_closure hs_on_recv; + grpc_mdctx *mdctx; } call_data; -typedef struct channel_data { - grpc_mdelem *te_trailers; - grpc_mdelem *method_post; - grpc_mdelem *http_scheme; - grpc_mdelem *https_scheme; - /* TODO(klempner): Remove this once we stop using it */ - grpc_mdelem *grpc_scheme; - grpc_mdelem *content_type; - grpc_mdelem *status_ok; - grpc_mdelem *status_not_found; - grpc_mdstr *path_key; - grpc_mdstr *authority_key; - grpc_mdstr *host_key; - - grpc_mdctx *mdctx; -} channel_data; +typedef struct channel_data { gpr_uint8 unused; } channel_data; typedef struct { grpc_call_element *elem; @@ -82,25 +69,24 @@ typedef struct { static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { server_filter_args *a = user_data; grpc_call_element *elem = a->elem; - channel_data *channeld = elem->channel_data; call_data *calld = elem->call_data; /* Check if it is one of the headers we care about. */ - if (md == channeld->te_trailers || md == channeld->method_post || - md == channeld->http_scheme || md == channeld->https_scheme || - md == channeld->grpc_scheme || md == channeld->content_type) { + if (md == GRPC_MDELEM_TE_TRAILERS || md == GRPC_MDELEM_METHOD_POST || + md == GRPC_MDELEM_SCHEME_HTTP || md == GRPC_MDELEM_SCHEME_HTTPS || + md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) { /* swallow it */ - if (md == channeld->method_post) { + if (md == GRPC_MDELEM_METHOD_POST) { calld->seen_post = 1; - } else if (md->key == channeld->http_scheme->key) { + } else if (md->key == GRPC_MDSTR_SCHEME) { calld->seen_scheme = 1; - } else if (md == channeld->te_trailers) { + } else if (md == GRPC_MDELEM_TE_TRAILERS) { calld->seen_te_trailers = 1; } /* TODO(klempner): Track that we've seen all the headers we should require */ return NULL; - } else if (md->key == channeld->content_type->key) { + } else if (md->key == GRPC_MDSTR_CONTENT_TYPE) { if (strncmp(grpc_mdstr_as_c_string(md->value), "application/grpc+", 17) == 0) { /* Although the C implementation doesn't (currently) generate them, @@ -112,12 +98,11 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { /* TODO(klempner): We're currently allowing this, but we shouldn't see it without a proxy so log for now. */ gpr_log(GPR_INFO, "Unexpected content-type %s", - channeld->content_type->key); + grpc_mdstr_as_c_string(md->value)); } return NULL; - } else if (md->key == channeld->te_trailers->key || - md->key == channeld->method_post->key || - md->key == channeld->http_scheme->key) { + } else if (md->key == GRPC_MDSTR_TE || md->key == GRPC_MDSTR_METHOD || + md->key == GRPC_MDSTR_SCHEME) { gpr_log(GPR_ERROR, "Invalid %s: header: '%s'", grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)); /* swallow it and error everything out. */ @@ -125,22 +110,21 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { on the wire here. */ grpc_call_element_send_cancel(a->exec_ctx, elem); return NULL; - } else if (md->key == channeld->path_key) { + } else if (md->key == GRPC_MDSTR_PATH) { if (calld->seen_path) { gpr_log(GPR_ERROR, "Received :path twice"); return NULL; } calld->seen_path = 1; return md; - } else if (md->key == channeld->authority_key) { + } else if (md->key == GRPC_MDSTR_AUTHORITY) { calld->seen_authority = 1; return md; - } else if (md->key == channeld->host_key) { + } else if (md->key == GRPC_MDSTR_HOST) { /* translate host to :authority since :authority may be omitted */ grpc_mdelem *authority = grpc_mdelem_from_metadata_strings( - channeld->mdctx, GRPC_MDSTR_REF(channeld->authority_key), - GRPC_MDSTR_REF(md->value)); + calld->mdctx, GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value)); GRPC_MDELEM_UNREF(md); calld->seen_authority = 1; return authority; @@ -191,15 +175,14 @@ static void hs_mutate_op(grpc_call_element *elem, grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; if (op->send_initial_metadata != NULL && !calld->sent_status) { calld->sent_status = 1; grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->status, - GRPC_MDELEM_REF(channeld->status_ok)); - grpc_metadata_batch_add_tail(op->send_initial_metadata, - &calld->content_type, - GRPC_MDELEM_REF(channeld->content_type)); + GRPC_MDELEM_STATUS_200); + grpc_metadata_batch_add_tail( + op->send_initial_metadata, &calld->content_type, + GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); } if (op->recv_initial_metadata) { @@ -228,6 +211,7 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, /* initialize members */ memset(calld, 0, sizeof(*calld)); grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem); + calld->mdctx = args->metadata_context; } /* Destructor for call_data */ @@ -238,56 +222,12 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { - /* grab pointers to our data from the channel element */ - channel_data *channeld = elem->channel_data; - - /* The first and the last filters tend to be implemented differently to - handle the case that there's no 'next' filter to call on the up or down - path */ GPR_ASSERT(!args->is_last); - - /* initialize members */ - channeld->te_trailers = - grpc_mdelem_from_strings(args->metadata_context, "te", "trailers"); - channeld->status_ok = - grpc_mdelem_from_strings(args->metadata_context, ":status", "200"); - channeld->status_not_found = - grpc_mdelem_from_strings(args->metadata_context, ":status", "404"); - channeld->method_post = - grpc_mdelem_from_strings(args->metadata_context, ":method", "POST"); - channeld->http_scheme = - grpc_mdelem_from_strings(args->metadata_context, ":scheme", "http"); - channeld->https_scheme = - grpc_mdelem_from_strings(args->metadata_context, ":scheme", "https"); - channeld->grpc_scheme = - grpc_mdelem_from_strings(args->metadata_context, ":scheme", "grpc"); - channeld->path_key = grpc_mdstr_from_string(args->metadata_context, ":path"); - channeld->authority_key = - grpc_mdstr_from_string(args->metadata_context, ":authority"); - channeld->host_key = grpc_mdstr_from_string(args->metadata_context, "host"); - channeld->content_type = grpc_mdelem_from_strings( - args->metadata_context, "content-type", "application/grpc"); - - channeld->mdctx = args->metadata_context; } /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { - /* grab pointers to our data from the channel element */ - channel_data *channeld = elem->channel_data; - - GRPC_MDELEM_UNREF(channeld->te_trailers); - GRPC_MDELEM_UNREF(channeld->status_ok); - GRPC_MDELEM_UNREF(channeld->status_not_found); - GRPC_MDELEM_UNREF(channeld->method_post); - GRPC_MDELEM_UNREF(channeld->http_scheme); - GRPC_MDELEM_UNREF(channeld->https_scheme); - GRPC_MDELEM_UNREF(channeld->grpc_scheme); - GRPC_MDELEM_UNREF(channeld->content_type); - GRPC_MDSTR_UNREF(channeld->path_key); - GRPC_MDSTR_UNREF(channeld->authority_key); - GRPC_MDSTR_UNREF(channeld->host_key); } const grpc_channel_filter grpc_http_server_filter = { diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 7251714519..c5340e0eaf 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -58,7 +58,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, void grpc_subchannel_call_holder_init( grpc_subchannel_call_holder *holder, grpc_subchannel_call_holder_pick_subchannel pick_subchannel, - void *pick_subchannel_arg) { + void *pick_subchannel_arg, grpc_mdctx *mdctx) { gpr_atm_rel_store(&holder->subchannel_call, 0); holder->pick_subchannel = pick_subchannel; holder->pick_subchannel_arg = pick_subchannel_arg; @@ -68,6 +68,7 @@ void grpc_subchannel_call_holder_init( holder->waiting_ops_count = 0; holder->waiting_ops_capacity = 0; holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + holder->mdctx = mdctx; } void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, @@ -156,9 +157,9 @@ retry: holder->subchannel != NULL) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL; grpc_closure_init(&holder->next_step, call_ready, holder); - if (grpc_subchannel_create_call(exec_ctx, holder->subchannel, - holder->pollset, &holder->subchannel_call, - &holder->next_step)) { + if (grpc_subchannel_create_call( + exec_ctx, holder->subchannel, holder->pollset, holder->mdctx, + &holder->subchannel_call, &holder->next_step)) { /* got one immediately - continue the op (and any waiting ops) */ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; retry_waiting_locked(exec_ctx, holder); @@ -184,9 +185,9 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { fail_locked(exec_ctx, holder); } else { grpc_closure_init(&holder->next_step, call_ready, holder); - if (grpc_subchannel_create_call(exec_ctx, holder->subchannel, - holder->pollset, &holder->subchannel_call, - &holder->next_step)) { + if (grpc_subchannel_create_call( + exec_ctx, holder->subchannel, holder->pollset, holder->mdctx, + &holder->subchannel_call, &holder->next_step)) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; /* got one immediately - continue the op (and any waiting ops) */ retry_waiting_locked(exec_ctx, holder); diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h index bda051c566..a770be257c 100644 --- a/src/core/channel/subchannel_call_holder.h +++ b/src/core/channel/subchannel_call_holder.h @@ -68,6 +68,8 @@ typedef struct grpc_subchannel_call_holder { grpc_subchannel_call_holder_pick_subchannel pick_subchannel; void *pick_subchannel_arg; + grpc_mdctx *mdctx; + gpr_mu mu; grpc_subchannel_call_holder_creation_phase creation_phase; @@ -84,7 +86,7 @@ typedef struct grpc_subchannel_call_holder { void grpc_subchannel_call_holder_init( grpc_subchannel_call_holder *holder, grpc_subchannel_call_holder_pick_subchannel pick_subchannel, - void *pick_subchannel_arg); + void *pick_subchannel_arg, grpc_mdctx *mdctx); void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder); |