diff options
author | 2015-11-19 17:09:49 -0800 | |
---|---|---|
committer | 2015-11-19 17:09:49 -0800 | |
commit | ebdef9d674b33e7bd117e2a4a2da7762c35bbdfc (patch) | |
tree | 4ae8484d1f35f667bef4e55c36560527cc7cbb91 /src/core/surface | |
parent | f6e40fd2aaf50e075a6c91eab33acc9f72bf9ab0 (diff) |
Remove metadata context from the channel stack
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 47 | ||||
-rw-r--r-- | src/core/surface/channel.c | 141 | ||||
-rw-r--r-- | src/core/surface/channel.h | 6 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 12 | ||||
-rw-r--r-- | src/core/surface/server.c | 13 |
5 files changed, 102 insertions, 117 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index aa435d44d3..de0afb93b3 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -43,6 +43,7 @@ #include <grpc/support/useful.h> #include "src/core/channel/channel_stack.h" +#include "src/core/compression/algorithm_metadata.h" #include "src/core/iomgr/timer.h" #include "src/core/profiling/timers.h" #include "src/core/support/string.h" @@ -50,6 +51,7 @@ #include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" +#include "src/core/transport/static_metadata.h" /** The maximum number of concurrent batches possible. Based upon the maximum number of individually queueable ops in the batch @@ -271,6 +273,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, /* initial refcount dropped by grpc_call_destroy */ grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call, call->context, server_transport_data, + grpc_channel_get_metadata_context(channel), CALL_STACK_FROM_CALL(call)); if (cq != NULL) { GRPC_CQ_INTERNAL_REF(cq, "bind"); @@ -788,8 +791,12 @@ static void destroy_status(void *ignored) {} static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; - void *user_data = grpc_mdelem_get_user_data(md, destroy_status); - if (user_data) { + void *user_data; + if (md == GRPC_MDELEM_GRPC_STATUS_0) return 0; + if (md == GRPC_MDELEM_GRPC_STATUS_1) return 1; + if (md == GRPC_MDELEM_GRPC_STATUS_2) return 2; + user_data = grpc_mdelem_get_user_data(md, destroy_status); + if (user_data != NULL) { status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), @@ -803,38 +810,23 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { return status; } -/* just as for status above, we need to offset: metadata userdata can't hold a - * zero (null), which in this case is used to signal no compression */ -#define COMPRESS_OFFSET 1 -static void destroy_compression(void *ignored) {} - static gpr_uint32 decode_compression(grpc_mdelem *md) { - grpc_compression_algorithm algorithm; - void *user_data = grpc_mdelem_get_user_data(md, destroy_compression); - if (user_data) { - algorithm = - ((grpc_compression_algorithm)(gpr_intptr)user_data) - COMPRESS_OFFSET; - } else { + grpc_compression_algorithm algorithm = + grpc_compression_algorithm_from_mdstr(md->value); + if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); - if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), - &algorithm)) { - gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); - assert(0); - } - grpc_mdelem_set_user_data( - md, destroy_compression, - (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET)); + gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); } return algorithm; } static grpc_mdelem *recv_common_filter(grpc_call *call, grpc_mdelem *elem) { - if (elem->key == grpc_channel_get_status_string(call->channel)) { + if (elem->key == GRPC_MDSTR_GRPC_STATUS) { GPR_TIMER_BEGIN("status", 0); set_status_code(call, STATUS_FROM_WIRE, decode_status(elem)); GPR_TIMER_END("status", 0); return NULL; - } else if (elem->key == grpc_channel_get_message_string(call->channel)) { + } else if (elem->key == GRPC_MDSTR_GRPC_MESSAGE) { GPR_TIMER_BEGIN("status-details", 0); set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(elem->value)); GPR_TIMER_END("status-details", 0); @@ -867,14 +859,12 @@ static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) { elem = recv_common_filter(call, elem); if (elem == NULL) { return NULL; - } else if (elem->key == - grpc_channel_get_compression_algorithm_string(call->channel)) { + } else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) { GPR_TIMER_BEGIN("compression_algorithm", 0); set_compression_algorithm(call, decode_compression(elem)); GPR_TIMER_END("compression_algorithm", 0); return NULL; - } else if (elem->key == grpc_channel_get_encodings_accepted_by_peer_string( - call->channel)) { + } else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) { GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); set_encodings_accepted_by_peer(call, elem); GPR_TIMER_END("encodings_accepted_by_peer", 0); @@ -1240,8 +1230,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->channel, op->data.send_status_from_server.status); if (op->data.send_status_from_server.status_details != NULL) { call->send_extra_metadata[1].md = grpc_mdelem_from_metadata_strings( - call->metadata_context, - GRPC_MDSTR_REF(grpc_channel_get_message_string(call->channel)), + call->metadata_context, GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_string( call->metadata_context, op->data.send_status_from_server.status_details)); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index a9a5f828f2..0ca877a394 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -46,6 +46,7 @@ #include "src/core/surface/api_trace.h" #include "src/core/surface/call.h" #include "src/core/surface/init.h" +#include "src/core/transport/static_metadata.h" /** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS. * Avoids needing to take a metadata context lock for sending status @@ -65,16 +66,7 @@ struct grpc_channel { gpr_refcount refs; gpr_uint32 max_message_length; grpc_mdctx *metadata_context; - /** mdstr for the grpc-status key */ - grpc_mdstr *grpc_status_string; - grpc_mdstr *grpc_compression_algorithm_string; - grpc_mdstr *grpc_encodings_accepted_by_peer_string; - grpc_mdstr *grpc_message_string; - grpc_mdstr *path_string; - grpc_mdstr *authority_string; grpc_mdelem *default_authority; - /** mdelem for grpc-status: 0 thru grpc-status: 2 */ - grpc_mdelem *grpc_status_elem[NUM_CACHED_STATUS_ELEMS]; gpr_mu registered_call_mu; registered_call *registered_calls; @@ -90,6 +82,55 @@ struct grpc_channel { /* the protobuf library will (by default) start warning at 100megs */ #define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) +static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx, + const grpc_channel_args *args) { + gpr_strvec v; + size_t i; + int is_first = 1; + char *tmp; + grpc_mdstr *result; + + gpr_strvec_init(&v); + + for (i = 0; args && i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) { + if (args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", + GRPC_ARG_PRIMARY_USER_AGENT_STRING); + } else { + if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); + is_first = 0; + gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); + } + } + } + + gpr_asprintf(&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ", + grpc_version_string(), GPR_PLATFORM_STRING); + is_first = 0; + gpr_strvec_add(&v, tmp); + + for (i = 0; args && i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) { + if (args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", + GRPC_ARG_SECONDARY_USER_AGENT_STRING); + } else { + if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); + is_first = 0; + gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); + } + } + } + + tmp = gpr_strvec_flatten(&v, NULL); + gpr_strvec_destroy(&v); + result = grpc_mdstr_from_string(mdctx, tmp); + gpr_free(tmp); + + return result; +} + grpc_channel *grpc_channel_create_from_filters( grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_filter **filters, size_t num_filters, @@ -105,24 +146,16 @@ grpc_channel *grpc_channel_create_from_filters( /* decremented by grpc_channel_destroy */ gpr_ref_init(&channel->refs, 1); channel->metadata_context = mdctx; - channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); - channel->grpc_compression_algorithm_string = - grpc_mdstr_from_string(mdctx, "grpc-encoding"); - channel->grpc_encodings_accepted_by_peer_string = - grpc_mdstr_from_string(mdctx, "grpc-accept-encoding"); - channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); - for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { - char buf[GPR_LTOA_MIN_BUFSIZE]; - gpr_ltoa((long)i, buf); - channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings( - mdctx, GRPC_MDSTR_REF(channel->grpc_status_string), - grpc_mdstr_from_string(mdctx, buf)); - } - channel->path_string = grpc_mdstr_from_string(mdctx, ":path"); - channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority"); gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; + if (is_client) { + grpc_mdctx_set_mdelem_cache( + mdctx, GRPC_MDELEM_CACHED_USER_AGENT, + grpc_mdelem_from_metadata_strings(mdctx, GRPC_MDSTR_USER_AGENT, + user_agent_from_args(mdctx, args))); + } + channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; if (args) { for (i = 0; i < args->num_args; i++) { @@ -178,7 +211,6 @@ grpc_channel *grpc_channel_create_from_filters( } grpc_channel_stack_init(exec_ctx, filters, num_filters, channel, args, - channel->metadata_context, CHANNEL_STACK_FROM_CHANNEL(channel)); return channel; @@ -228,11 +260,10 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, return grpc_channel_create_call_internal( channel, parent_call, propagation_mask, cq, grpc_mdelem_from_metadata_strings( - channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), + channel->metadata_context, GRPC_MDSTR_PATH, grpc_mdstr_from_string(channel->metadata_context, method)), host ? grpc_mdelem_from_metadata_strings( - channel->metadata_context, - GRPC_MDSTR_REF(channel->authority_string), + channel->metadata_context, GRPC_MDSTR_AUTHORITY, grpc_mdstr_from_string(channel->metadata_context, host)) : NULL, deadline); @@ -246,12 +277,11 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method, 4, (channel, method, host, reserved)); GPR_ASSERT(!reserved); rc->path = grpc_mdelem_from_metadata_strings( - channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), + channel->metadata_context, GRPC_MDSTR_PATH, grpc_mdstr_from_string(channel->metadata_context, method)); rc->authority = host ? grpc_mdelem_from_metadata_strings( - channel->metadata_context, - GRPC_MDSTR_REF(channel->authority_string), + channel->metadata_context, GRPC_MDSTR_AUTHORITY, grpc_mdstr_from_string(channel->metadata_context, host)) : NULL; gpr_mu_lock(&channel->registered_call_mu); @@ -293,17 +323,7 @@ void grpc_channel_internal_ref(grpc_channel *c) { } static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) { - size_t i; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel)); - for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { - GRPC_MDELEM_UNREF(channel->grpc_status_elem[i]); - } - GRPC_MDSTR_UNREF(channel->grpc_status_string); - GRPC_MDSTR_UNREF(channel->grpc_compression_algorithm_string); - GRPC_MDSTR_UNREF(channel->grpc_encodings_accepted_by_peer_string); - GRPC_MDSTR_UNREF(channel->grpc_message_string); - GRPC_MDSTR_UNREF(channel->path_string); - GRPC_MDSTR_UNREF(channel->authority_string); while (channel->registered_calls) { registered_call *rc = channel->registered_calls; channel->registered_calls = rc->next; @@ -316,6 +336,7 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) { if (channel->default_authority != NULL) { GRPC_MDELEM_UNREF(channel->default_authority); } + grpc_mdctx_drop_caches(channel->metadata_context); grpc_mdctx_unref(channel->metadata_context); gpr_mu_destroy(&channel->registered_call_mu); gpr_free(channel->target); @@ -359,34 +380,20 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel) { return channel->metadata_context; } -grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { - return channel->grpc_status_string; -} - -grpc_mdstr *grpc_channel_get_compression_algorithm_string( - grpc_channel *channel) { - return channel->grpc_compression_algorithm_string; -} - -grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string( - grpc_channel *channel) { - return channel->grpc_encodings_accepted_by_peer_string; -} - grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { - if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { - return GRPC_MDELEM_REF(channel->grpc_status_elem[i]); - } else { - char tmp[GPR_LTOA_MIN_BUFSIZE]; - gpr_ltoa(i, tmp); - return grpc_mdelem_from_metadata_strings( - channel->metadata_context, GRPC_MDSTR_REF(channel->grpc_status_string), - grpc_mdstr_from_string(channel->metadata_context, tmp)); + char tmp[GPR_LTOA_MIN_BUFSIZE]; + switch (i) { + case 0: + return GRPC_MDELEM_GRPC_STATUS_0; + case 1: + return GRPC_MDELEM_GRPC_STATUS_1; + case 2: + return GRPC_MDELEM_GRPC_STATUS_2; } -} - -grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) { - return channel->grpc_message_string; + gpr_ltoa(i, tmp); + return grpc_mdelem_from_metadata_strings( + channel->metadata_context, GRPC_MDSTR_GRPC_STATUS, + grpc_mdstr_from_string(channel->metadata_context, tmp)); } gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) { diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index e5030d52d2..8bafce4216 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -54,12 +54,6 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); The returned elem is owned by the caller. */ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int status_code); -grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); -grpc_mdstr *grpc_channel_get_compression_algorithm_string( - grpc_channel *channel); -grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string( - grpc_channel *channel); -grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index fc458a603d..96ad012add 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -46,10 +46,10 @@ typedef struct { grpc_linked_mdelem status; grpc_linked_mdelem details; + grpc_mdctx *mdctx; } call_data; typedef struct { - grpc_mdctx *mdctx; grpc_channel *master; grpc_status_code error_code; const char *error_message; @@ -60,8 +60,8 @@ static void fill_metadata(grpc_call_element *elem, grpc_metadata_batch *mdb) { channel_data *chand = elem->channel_data; char tmp[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(chand->error_code, tmp); - calld->status.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-status", tmp); - calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message", + calld->status.md = grpc_mdelem_from_strings(calld->mdctx, "grpc-status", tmp); + calld->details.md = grpc_mdelem_from_strings(calld->mdctx, "grpc-message", chand->error_message); calld->status.prev = calld->details.next = NULL; calld->status.next = &calld->details; @@ -104,7 +104,10 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, } static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) {} + grpc_call_element_args *args) { + call_data *calld = elem->call_data; + calld->mdctx = args->metadata_context; +} static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {} @@ -115,7 +118,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; GPR_ASSERT(args->is_first); GPR_ASSERT(args->is_last); - chand->mdctx = args->metadata_context; chand->master = args->master; } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index e9f380083f..d33c421877 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -54,6 +54,7 @@ #include "src/core/surface/completion_queue.h" #include "src/core/surface/init.h" #include "src/core/transport/metadata.h" +#include "src/core/transport/static_metadata.h" typedef struct listener { void *arg; @@ -108,8 +109,6 @@ struct channel_data { grpc_server *server; grpc_connectivity_state connectivity_state; grpc_channel *channel; - grpc_mdstr *path_key; - grpc_mdstr *authority_key; /* linked list of all channels on a server */ channel_data *next; channel_data *prev; @@ -558,12 +557,11 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (md->key == chand->path_key) { + if (md->key == GRPC_MDSTR_PATH) { calld->path = GRPC_MDSTR_REF(md->value); return NULL; - } else if (md->key == chand->authority_key) { + } else if (md->key == GRPC_MDSTR_AUTHORITY) { calld->host = GRPC_MDSTR_REF(md->value); return NULL; } @@ -718,9 +716,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!args->is_last); chand->server = NULL; chand->channel = NULL; - chand->path_key = grpc_mdstr_from_string(args->metadata_context, ":path"); - chand->authority_key = - grpc_mdstr_from_string(args->metadata_context, ":authority"); chand->next = chand->prev = chand; chand->registered_methods = NULL; chand->connectivity_state = GRPC_CHANNEL_IDLE; @@ -750,8 +745,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->next = chand->prev = chand; maybe_finish_shutdown(exec_ctx, chand->server); gpr_mu_unlock(&chand->server->mu_global); - GRPC_MDSTR_UNREF(chand->path_key); - GRPC_MDSTR_UNREF(chand->authority_key); server_unref(exec_ctx, chand->server); } } |