diff options
author | yang-g <yangg@google.com> | 2015-12-02 13:23:33 -0800 |
---|---|---|
committer | yang-g <yangg@google.com> | 2015-12-02 13:23:33 -0800 |
commit | d88e1d8f4e42764e17aa2bed3a85b39f68673c03 (patch) | |
tree | 57cf6860d1c67257ccedc8b762f744e9e4df4a69 /src/core/surface | |
parent | 768999d0a351e2fd0a691d6391ff767228135123 (diff) | |
parent | 9fac2afdafe1355b02a355b06955b21d2a2cd004 (diff) |
merge with head and resolve conflict
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/byte_buffer_reader.c | 1 | ||||
-rw-r--r-- | src/core/surface/call.c | 62 | ||||
-rw-r--r-- | src/core/surface/channel.c | 115 | ||||
-rw-r--r-- | src/core/surface/channel.h | 11 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 39 | ||||
-rw-r--r-- | src/core/surface/init.c | 2 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 12 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 44 | ||||
-rw-r--r-- | src/core/surface/server.c | 21 | ||||
-rw-r--r-- | src/core/surface/server.h | 2 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 13 |
11 files changed, 124 insertions, 198 deletions
diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c index 9f830df68c..57417f41b0 100644 --- a/src/core/surface/byte_buffer_reader.c +++ b/src/core/surface/byte_buffer_reader.c @@ -121,4 +121,3 @@ gpr_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader) { } return out_slice; } - diff --git a/src/core/surface/call.c b/src/core/surface/call.c index aa435d44d3..4affafa585 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -43,6 +43,7 @@ #include <grpc/support/useful.h> #include "src/core/channel/channel_stack.h" +#include "src/core/compression/algorithm_metadata.h" #include "src/core/iomgr/timer.h" #include "src/core/profiling/timers.h" #include "src/core/support/string.h" @@ -50,6 +51,7 @@ #include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" +#include "src/core/transport/static_metadata.h" /** The maximum number of concurrent batches possible. Based upon the maximum number of individually queueable ops in the batch @@ -135,7 +137,6 @@ struct grpc_call { grpc_channel *channel; grpc_call *parent; grpc_call *first_child; - grpc_mdctx *metadata_context; /* TODO(ctiller): share with cq if possible? */ gpr_mu mu; @@ -267,7 +268,6 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, } call->send_deadline = send_deadline; GRPC_CHANNEL_INTERNAL_REF(channel, "call"); - call->metadata_context = grpc_channel_get_metadata_context(channel); /* initial refcount dropped by grpc_call_destroy */ grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call, call->context, server_transport_data, @@ -567,9 +567,8 @@ static int prepare_application_metadata(grpc_call *call, int count, grpc_metadata *md = &metadata[i]; grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); - l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, - (const gpr_uint8 *)md->value, - md->value_length); + l->md = grpc_mdelem_from_string_and_buffer( + md->key, (const gpr_uint8 *)md->value, md->value_length); if (!grpc_mdstr_is_legal_header(l->md->key)) { gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s", grpc_mdstr_as_c_string(l->md->key)); @@ -712,8 +711,7 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description) { grpc_mdstr *details = - description ? grpc_mdstr_from_string(c->metadata_context, description) - : NULL; + description ? grpc_mdstr_from_string(description) : NULL; cancel_closure *cc = gpr_malloc(sizeof(*cc)); GPR_ASSERT(status != GRPC_STATUS_OK); @@ -788,8 +786,12 @@ static void destroy_status(void *ignored) {} static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; - void *user_data = grpc_mdelem_get_user_data(md, destroy_status); - if (user_data) { + void *user_data; + if (md == GRPC_MDELEM_GRPC_STATUS_0) return 0; + if (md == GRPC_MDELEM_GRPC_STATUS_1) return 1; + if (md == GRPC_MDELEM_GRPC_STATUS_2) return 2; + user_data = grpc_mdelem_get_user_data(md, destroy_status); + if (user_data != NULL) { status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), @@ -803,38 +805,23 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { return status; } -/* just as for status above, we need to offset: metadata userdata can't hold a - * zero (null), which in this case is used to signal no compression */ -#define COMPRESS_OFFSET 1 -static void destroy_compression(void *ignored) {} - static gpr_uint32 decode_compression(grpc_mdelem *md) { - grpc_compression_algorithm algorithm; - void *user_data = grpc_mdelem_get_user_data(md, destroy_compression); - if (user_data) { - algorithm = - ((grpc_compression_algorithm)(gpr_intptr)user_data) - COMPRESS_OFFSET; - } else { + grpc_compression_algorithm algorithm = + grpc_compression_algorithm_from_mdstr(md->value); + if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); - if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), - &algorithm)) { - gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); - assert(0); - } - grpc_mdelem_set_user_data( - md, destroy_compression, - (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET)); + gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); } return algorithm; } static grpc_mdelem *recv_common_filter(grpc_call *call, grpc_mdelem *elem) { - if (elem->key == grpc_channel_get_status_string(call->channel)) { + if (elem->key == GRPC_MDSTR_GRPC_STATUS) { GPR_TIMER_BEGIN("status", 0); set_status_code(call, STATUS_FROM_WIRE, decode_status(elem)); GPR_TIMER_END("status", 0); return NULL; - } else if (elem->key == grpc_channel_get_message_string(call->channel)) { + } else if (elem->key == GRPC_MDSTR_GRPC_MESSAGE) { GPR_TIMER_BEGIN("status-details", 0); set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(elem->value)); GPR_TIMER_END("status-details", 0); @@ -867,14 +854,12 @@ static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) { elem = recv_common_filter(call, elem); if (elem == NULL) { return NULL; - } else if (elem->key == - grpc_channel_get_compression_algorithm_string(call->channel)) { + } else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) { GPR_TIMER_BEGIN("compression_algorithm", 0); set_compression_algorithm(call, decode_compression(elem)); GPR_TIMER_END("compression_algorithm", 0); return NULL; - } else if (elem->key == grpc_channel_get_encodings_accepted_by_peer_string( - call->channel)) { + } else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) { GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); set_encodings_accepted_by_peer(call, elem); GPR_TIMER_END("encodings_accepted_by_peer", 0); @@ -922,11 +907,12 @@ static batch_control *allocate_batch_control(grpc_call *call) { size_t i; for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) { if ((call->used_batches & (1 << i)) == 0) { - call->used_batches |= (gpr_uint8)(1 << i); + call->used_batches = + (gpr_uint8)(call->used_batches | (gpr_uint8)(1 << i)); return &call->active_batches[i]; } } - GPR_UNREACHABLE_CODE(return NULL); + return NULL; } static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, @@ -1240,10 +1226,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->channel, op->data.send_status_from_server.status); if (op->data.send_status_from_server.status_details != NULL) { call->send_extra_metadata[1].md = grpc_mdelem_from_metadata_strings( - call->metadata_context, - GRPC_MDSTR_REF(grpc_channel_get_message_string(call->channel)), + GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_string( - call->metadata_context, op->data.send_status_from_server.status_details)); call->send_extra_metadata_count++; set_status_details( diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index a9a5f828f2..859197412b 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -46,6 +46,7 @@ #include "src/core/surface/api_trace.h" #include "src/core/surface/call.h" #include "src/core/surface/init.h" +#include "src/core/transport/static_metadata.h" /** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS. * Avoids needing to take a metadata context lock for sending status @@ -64,17 +65,7 @@ struct grpc_channel { int is_client; gpr_refcount refs; gpr_uint32 max_message_length; - grpc_mdctx *metadata_context; - /** mdstr for the grpc-status key */ - grpc_mdstr *grpc_status_string; - grpc_mdstr *grpc_compression_algorithm_string; - grpc_mdstr *grpc_encodings_accepted_by_peer_string; - grpc_mdstr *grpc_message_string; - grpc_mdstr *path_string; - grpc_mdstr *authority_string; grpc_mdelem *default_authority; - /** mdelem for grpc-status: 0 thru grpc-status: 2 */ - grpc_mdelem *grpc_status_elem[NUM_CACHED_STATUS_ELEMS]; gpr_mu registered_call_mu; registered_call *registered_calls; @@ -93,7 +84,7 @@ struct grpc_channel { grpc_channel *grpc_channel_create_from_filters( grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_filter **filters, size_t num_filters, - const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) { + const grpc_channel_args *args, int is_client) { size_t i; size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); @@ -104,22 +95,6 @@ grpc_channel *grpc_channel_create_from_filters( channel->is_client = is_client; /* decremented by grpc_channel_destroy */ gpr_ref_init(&channel->refs, 1); - channel->metadata_context = mdctx; - channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); - channel->grpc_compression_algorithm_string = - grpc_mdstr_from_string(mdctx, "grpc-encoding"); - channel->grpc_encodings_accepted_by_peer_string = - grpc_mdstr_from_string(mdctx, "grpc-accept-encoding"); - channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); - for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { - char buf[GPR_LTOA_MIN_BUFSIZE]; - gpr_ltoa((long)i, buf); - channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings( - mdctx, GRPC_MDSTR_REF(channel->grpc_status_string), - grpc_mdstr_from_string(mdctx, buf)); - } - channel->path_string = grpc_mdstr_from_string(mdctx, ":path"); - channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority"); gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; @@ -146,7 +121,7 @@ grpc_channel *grpc_channel_create_from_filters( GRPC_MDELEM_UNREF(channel->default_authority); } channel->default_authority = grpc_mdelem_from_strings( - mdctx, ":authority", args->args[i].value.string); + ":authority", args->args[i].value.string); } } else if (0 == strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) { @@ -160,7 +135,7 @@ grpc_channel *grpc_channel_create_from_filters( GRPC_ARG_DEFAULT_AUTHORITY); } else { channel->default_authority = grpc_mdelem_from_strings( - mdctx, ":authority", args->args[i].value.string); + ":authority", args->args[i].value.string); } } } @@ -171,14 +146,13 @@ grpc_channel *grpc_channel_create_from_filters( target != NULL) { char *default_authority = grpc_get_default_authority(target); if (default_authority) { - channel->default_authority = grpc_mdelem_from_strings( - channel->metadata_context, ":authority", default_authority); + channel->default_authority = + grpc_mdelem_from_strings(":authority", default_authority); } gpr_free(default_authority); } grpc_channel_stack_init(exec_ctx, filters, num_filters, channel, args, - channel->metadata_context, CHANNEL_STACK_FROM_CHANNEL(channel)); return channel; @@ -227,13 +201,10 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, GPR_ASSERT(!reserved); return grpc_channel_create_call_internal( channel, parent_call, propagation_mask, cq, - grpc_mdelem_from_metadata_strings( - channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), - grpc_mdstr_from_string(channel->metadata_context, method)), - host ? grpc_mdelem_from_metadata_strings( - channel->metadata_context, - GRPC_MDSTR_REF(channel->authority_string), - grpc_mdstr_from_string(channel->metadata_context, host)) + grpc_mdelem_from_metadata_strings(GRPC_MDSTR_PATH, + grpc_mdstr_from_string(method)), + host ? grpc_mdelem_from_metadata_strings(GRPC_MDSTR_AUTHORITY, + grpc_mdstr_from_string(host)) : NULL, deadline); } @@ -245,15 +216,11 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method, "grpc_channel_register_call(channel=%p, method=%s, host=%s, reserved=%p)", 4, (channel, method, host, reserved)); GPR_ASSERT(!reserved); - rc->path = grpc_mdelem_from_metadata_strings( - channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), - grpc_mdstr_from_string(channel->metadata_context, method)); - rc->authority = - host ? grpc_mdelem_from_metadata_strings( - channel->metadata_context, - GRPC_MDSTR_REF(channel->authority_string), - grpc_mdstr_from_string(channel->metadata_context, host)) - : NULL; + rc->path = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_PATH, + grpc_mdstr_from_string(method)); + rc->authority = host ? grpc_mdelem_from_metadata_strings( + GRPC_MDSTR_AUTHORITY, grpc_mdstr_from_string(host)) + : NULL; gpr_mu_lock(&channel->registered_call_mu); rc->next = channel->registered_calls; channel->registered_calls = rc; @@ -293,17 +260,7 @@ void grpc_channel_internal_ref(grpc_channel *c) { } static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) { - size_t i; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel)); - for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { - GRPC_MDELEM_UNREF(channel->grpc_status_elem[i]); - } - GRPC_MDSTR_UNREF(channel->grpc_status_string); - GRPC_MDSTR_UNREF(channel->grpc_compression_algorithm_string); - GRPC_MDSTR_UNREF(channel->grpc_encodings_accepted_by_peer_string); - GRPC_MDSTR_UNREF(channel->grpc_message_string); - GRPC_MDSTR_UNREF(channel->path_string); - GRPC_MDSTR_UNREF(channel->authority_string); while (channel->registered_calls) { registered_call *rc = channel->registered_calls; channel->registered_calls = rc->next; @@ -316,7 +273,6 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) { if (channel->default_authority != NULL) { GRPC_MDELEM_UNREF(channel->default_authority); } - grpc_mdctx_unref(channel->metadata_context); gpr_mu_destroy(&channel->registered_call_mu); gpr_free(channel->target); gpr_free(channel); @@ -355,38 +311,19 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) { return CHANNEL_STACK_FROM_CHANNEL(channel); } -grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel) { - return channel->metadata_context; -} - -grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { - return channel->grpc_status_string; -} - -grpc_mdstr *grpc_channel_get_compression_algorithm_string( - grpc_channel *channel) { - return channel->grpc_compression_algorithm_string; -} - -grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string( - grpc_channel *channel) { - return channel->grpc_encodings_accepted_by_peer_string; -} - grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { - if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { - return GRPC_MDELEM_REF(channel->grpc_status_elem[i]); - } else { - char tmp[GPR_LTOA_MIN_BUFSIZE]; - gpr_ltoa(i, tmp); - return grpc_mdelem_from_metadata_strings( - channel->metadata_context, GRPC_MDSTR_REF(channel->grpc_status_string), - grpc_mdstr_from_string(channel->metadata_context, tmp)); + char tmp[GPR_LTOA_MIN_BUFSIZE]; + switch (i) { + case 0: + return GRPC_MDELEM_GRPC_STATUS_0; + case 1: + return GRPC_MDELEM_GRPC_STATUS_1; + case 2: + return GRPC_MDELEM_GRPC_STATUS_2; } -} - -grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) { - return channel->grpc_message_string; + gpr_ltoa(i, tmp); + return grpc_mdelem_from_metadata_strings(GRPC_MDSTR_GRPC_STATUS, + grpc_mdstr_from_string(tmp)); } gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) { diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index e5030d52d2..7dea609ebc 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -40,26 +40,17 @@ grpc_channel *grpc_channel_create_from_filters( grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_filter **filters, size_t count, - const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client); + const grpc_channel_args *args, int is_client); /** Get a (borrowed) pointer to this channels underlying channel stack */ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); -/** Get a (borrowed) pointer to the channel wide metadata context */ -grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); - /** Get a grpc_mdelem of grpc-status: X where X is the numeric value of status_code. The returned elem is owned by the caller. */ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int status_code); -grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); -grpc_mdstr *grpc_channel_get_compression_algorithm_string( - grpc_channel *channel); -grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string( - grpc_channel *channel); -grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 51d9130b63..aceb932742 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -37,6 +37,8 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/slice.h> +#include <grpc/support/slice_buffer.h> #include "src/core/census/grpc_filter.h" #include "src/core/channel/channel_args.h" @@ -56,11 +58,11 @@ typedef struct { grpc_closure *notify; grpc_connect_in_args args; grpc_connect_out_args *result; + grpc_closure initial_string_sent; + gpr_slice_buffer initial_string_buffer; grpc_endpoint *tcp; - grpc_mdctx *mdctx; - grpc_closure connected; } connector; @@ -72,18 +74,33 @@ static void connector_ref(grpc_connector *con) { static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { - grpc_mdctx_unref(c->mdctx); + /* c->initial_string_buffer does not need to be destroyed */ gpr_free(c); } } +static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, + int success) { + connector_unref(exec_ctx, arg); +} + static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) { connector *c = arg; grpc_closure *notify; grpc_endpoint *tcp = c->tcp; if (tcp != NULL) { - c->result->transport = grpc_create_chttp2_transport( - exec_ctx, c->args.channel_args, tcp, c->mdctx, 1); + if (!GPR_SLICE_IS_EMPTY(c->args.initial_connect_string)) { + grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, + c); + gpr_slice_buffer_init(&c->initial_string_buffer); + gpr_slice_buffer_add(&c->initial_string_buffer, + c->args.initial_connect_string); + connector_ref(arg); + grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, + &c->initial_string_sent); + } + c->result->transport = + grpc_create_chttp2_transport(exec_ctx, c->args.channel_args, tcp, 1); grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); GPR_ASSERT(c->result->transport); @@ -123,7 +140,6 @@ static const grpc_connector_vtable connector_vtable = { typedef struct { grpc_subchannel_factory base; gpr_refcount refs; - grpc_mdctx *mdctx; grpc_channel_args *merge_args; grpc_channel *master; } subchannel_factory; @@ -139,7 +155,6 @@ static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx, if (gpr_unref(&f->refs)) { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, "subchannel_factory"); grpc_channel_args_destroy(f->merge_args); - grpc_mdctx_unref(f->mdctx); gpr_free(f); } } @@ -154,10 +169,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( grpc_subchannel *s; memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; - c->mdctx = f->mdctx; - grpc_mdctx_ref(c->mdctx); gpr_ref_init(&c->refs, 1); - args->mdctx = f->mdctx; args->args = final_args; args->master = f->master; s = grpc_subchannel_create(&c->base, args); @@ -182,7 +194,6 @@ grpc_channel *grpc_insecure_channel_create(const char *target, const grpc_channel_filter *filters[MAX_FILTERS]; grpc_resolver *resolver; subchannel_factory *f; - grpc_mdctx *mdctx = grpc_mdctx_create(); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; size_t n = 0; GRPC_API_TRACE( @@ -196,14 +207,12 @@ grpc_channel *grpc_insecure_channel_create(const char *target, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); - channel = grpc_channel_create_from_filters(&exec_ctx, target, filters, n, - args, mdctx, 1); + channel = + grpc_channel_create_from_filters(&exec_ctx, target, filters, n, args, 1); f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; gpr_ref_init(&f->refs, 1); - grpc_mdctx_ref(mdctx); - f->mdctx = mdctx; f->merge_args = grpc_channel_args_copy(args); f->master = channel; GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory"); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index f8cba01cad..04d68620f1 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -93,6 +93,7 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { gpr_time_init(); + grpc_mdctx_global_init(); grpc_lb_policy_registry_init(grpc_pick_first_lb_factory_create()); grpc_register_lb_policy(grpc_pick_first_lb_factory_create()); grpc_register_lb_policy(grpc_round_robin_lb_factory_create()); @@ -147,6 +148,7 @@ void grpc_shutdown(void) { g_all_of_the_plugins[i].destroy(); } } + grpc_mdctx_global_shutdown(); } gpr_mu_unlock(&g_init_mu); } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index fc458a603d..4a55544ac1 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -49,7 +49,6 @@ typedef struct { } call_data; typedef struct { - grpc_mdctx *mdctx; grpc_channel *master; grpc_status_code error_code; const char *error_message; @@ -60,9 +59,9 @@ static void fill_metadata(grpc_call_element *elem, grpc_metadata_batch *mdb) { channel_data *chand = elem->channel_data; char tmp[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(chand->error_code, tmp); - calld->status.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-status", tmp); - calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message", - chand->error_message); + calld->status.md = grpc_mdelem_from_strings("grpc-status", tmp); + calld->details.md = + grpc_mdelem_from_strings("grpc-message", chand->error_message); calld->status.prev = calld->details.next = NULL; calld->status.next = &calld->details; calld->details.prev = &calld->status; @@ -115,7 +114,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; GPR_ASSERT(args->is_first); GPR_ASSERT(args->is_last); - chand->mdctx = args->metadata_context; chand->master = args->master; } @@ -139,8 +137,8 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, channel_data *chand; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; static const grpc_channel_filter *filters[] = {&lame_filter}; - channel = grpc_channel_create_from_filters(&exec_ctx, target, filters, 1, - NULL, grpc_mdctx_create(), 1); + channel = + grpc_channel_create_from_filters(&exec_ctx, target, filters, 1, NULL, 1); elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); GRPC_API_TRACE( "grpc_lame_client_channel_create(target=%s, error_code=%d, " diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index e07f04e8fe..c9a54d9237 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -37,6 +37,8 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/slice.h> +#include <grpc/support/slice_buffer.h> #include "src/core/census/grpc_filter.h" #include "src/core/channel/channel_args.h" @@ -61,14 +63,14 @@ typedef struct { grpc_closure *notify; grpc_connect_in_args args; grpc_connect_out_args *result; + grpc_closure initial_string_sent; + gpr_slice_buffer initial_string_buffer; gpr_mu mu; grpc_endpoint *connecting_endpoint; grpc_endpoint *newly_connecting_endpoint; grpc_closure connected_closure; - - grpc_mdctx *mdctx; } connector; static void connector_ref(grpc_connector *con) { @@ -79,7 +81,7 @@ static void connector_ref(grpc_connector *con) { static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { - grpc_mdctx_unref(c->mdctx); + /* c->initial_string_buffer does not need to be destroyed */ gpr_free(c); } } @@ -102,7 +104,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, c->connecting_endpoint = NULL; gpr_mu_unlock(&c->mu); c->result->transport = grpc_create_chttp2_transport( - exec_ctx, c->args.channel_args, secure_endpoint, c->mdctx, 1); + exec_ctx, c->args.channel_args, secure_endpoint, 1); grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2); @@ -115,6 +117,14 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, notify->cb(exec_ctx, notify->cb_arg, 1); } +static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, + int success) { + connector *c = arg; + grpc_security_connector_do_handshake(exec_ctx, &c->security_connector->base, + c->connecting_endpoint, + on_secure_handshake_done, c); +} + static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) { connector *c = arg; grpc_closure *notify; @@ -124,8 +134,19 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) { GPR_ASSERT(c->connecting_endpoint == NULL); c->connecting_endpoint = tcp; gpr_mu_unlock(&c->mu); - grpc_security_connector_do_handshake(exec_ctx, &c->security_connector->base, - tcp, on_secure_handshake_done, c); + if (!GPR_SLICE_IS_EMPTY(c->args.initial_connect_string)) { + grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, + c); + gpr_slice_buffer_init(&c->initial_string_buffer); + gpr_slice_buffer_add(&c->initial_string_buffer, + c->args.initial_connect_string); + grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, + &c->initial_string_sent); + } else { + grpc_security_connector_do_handshake(exec_ctx, + &c->security_connector->base, tcp, + on_secure_handshake_done, c); + } } else { memset(c->result, 0, sizeof(*c->result)); notify = c->notify; @@ -171,7 +192,6 @@ static const grpc_connector_vtable connector_vtable = { typedef struct { grpc_subchannel_factory base; gpr_refcount refs; - grpc_mdctx *mdctx; grpc_channel_args *merge_args; grpc_channel_security_connector *security_connector; grpc_channel *master; @@ -190,7 +210,6 @@ static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx, "subchannel_factory"); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, "subchannel_factory"); grpc_channel_args_destroy(f->merge_args); - grpc_mdctx_unref(f->mdctx); gpr_free(f); } } @@ -206,13 +225,10 @@ static grpc_subchannel *subchannel_factory_create_subchannel( memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; c->security_connector = f->security_connector; - c->mdctx = f->mdctx; gpr_mu_init(&c->mu); - grpc_mdctx_ref(c->mdctx); gpr_ref_init(&c->refs, 1); args->args = final_args; args->master = f->master; - args->mdctx = f->mdctx; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); @@ -236,7 +252,6 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, grpc_channel_args *args_copy; grpc_channel_args *new_args_from_connector; grpc_channel_security_connector *security_connector; - grpc_mdctx *mdctx; grpc_resolver *resolver; subchannel_factory *f; #define MAX_FILTERS 3 @@ -266,7 +281,6 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, target, GRPC_STATUS_INVALID_ARGUMENT, "Failed to create security connector."); } - mdctx = grpc_mdctx_create(); connector_arg = grpc_security_connector_to_arg(&security_connector->base); args_copy = grpc_channel_args_copy_and_add( @@ -280,13 +294,11 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, GPR_ASSERT(n <= MAX_FILTERS); channel = grpc_channel_create_from_filters(&exec_ctx, target, filters, n, - args_copy, mdctx, 1); + args_copy, 1); f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; gpr_ref_init(&f->refs, 1); - grpc_mdctx_ref(mdctx); - f->mdctx = mdctx; GRPC_SECURITY_CONNECTOR_REF(&security_connector->base, "subchannel_factory"); f->security_connector = security_connector; f->merge_args = grpc_channel_args_copy(args_copy); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index e9f380083f..cdbd542d9a 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -54,6 +54,7 @@ #include "src/core/surface/completion_queue.h" #include "src/core/surface/init.h" #include "src/core/transport/metadata.h" +#include "src/core/transport/static_metadata.h" typedef struct listener { void *arg; @@ -108,8 +109,6 @@ struct channel_data { grpc_server *server; grpc_connectivity_state connectivity_state; grpc_channel *channel; - grpc_mdstr *path_key; - grpc_mdstr *authority_key; /* linked list of all channels on a server */ channel_data *next; channel_data *prev; @@ -558,12 +557,11 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (md->key == chand->path_key) { + if (md->key == GRPC_MDSTR_PATH) { calld->path = GRPC_MDSTR_REF(md->value); return NULL; - } else if (md->key == chand->authority_key) { + } else if (md->key == GRPC_MDSTR_AUTHORITY) { calld->host = GRPC_MDSTR_REF(md->value); return NULL; } @@ -718,9 +716,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!args->is_last); chand->server = NULL; chand->channel = NULL; - chand->path_key = grpc_mdstr_from_string(args->metadata_context, ":path"); - chand->authority_key = - grpc_mdstr_from_string(args->metadata_context, ":authority"); chand->next = chand->prev = chand; chand->registered_methods = NULL; chand->connectivity_state = GRPC_CHANNEL_IDLE; @@ -750,8 +745,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->next = chand->prev = chand; maybe_finish_shutdown(exec_ctx, chand->server); gpr_mu_unlock(&chand->server->mu_global); - GRPC_MDSTR_UNREF(chand->path_key); - GRPC_MDSTR_UNREF(chand->authority_key); server_unref(exec_ctx, chand->server); } } @@ -894,7 +887,7 @@ void grpc_server_start(grpc_server *server) { void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, grpc_transport *transport, grpc_channel_filter const **extra_filters, - size_t num_extra_filters, grpc_mdctx *mdctx, + size_t num_extra_filters, const grpc_channel_args *args) { size_t num_filters = s->channel_filter_count + num_extra_filters + 1; grpc_channel_filter const **filters = @@ -929,7 +922,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, } channel = grpc_channel_create_from_filters(exec_ctx, NULL, filters, - num_filters, args, mdctx, 0); + num_filters, args, 0); chand = (channel_data *)grpc_channel_stack_element( grpc_channel_get_channel_stack(channel), 0)->channel_data; chand->server = s; @@ -948,8 +941,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, chand->registered_methods = gpr_malloc(alloc); memset(chand->registered_methods, 0, alloc); for (rm = s->registered_methods; rm; rm = rm->next) { - host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL; - method = grpc_mdstr_from_string(mdctx, rm->method); + host = rm->host ? grpc_mdstr_from_string(rm->host) : NULL; + method = grpc_mdstr_from_string(rm->method); hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash); for (probes = 0; chand->registered_methods[(hash + probes) % slots] .server_registered_method != NULL; diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 4c46d07679..a957fdb360 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -57,7 +57,7 @@ void grpc_server_add_listener( void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_transport *transport, grpc_channel_filter const **extra_filters, - size_t num_extra_filters, grpc_mdctx *mdctx, + size_t num_extra_filters, const grpc_channel_args *args); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 580b91573c..990bc4aa23 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -44,11 +44,11 @@ #include <grpc/support/useful.h> static void setup_transport(grpc_exec_ctx *exec_ctx, void *server, - grpc_transport *transport, grpc_mdctx *mdctx) { + grpc_transport *transport) { static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; grpc_server_setup_transport(exec_ctx, server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, + GPR_ARRAY_SIZE(extra_filters), grpc_server_get_channel_args(server)); } @@ -61,10 +61,9 @@ static void new_transport(grpc_exec_ctx *exec_ctx, void *server, * (as in server_secure_chttp2.c) needs to add synchronization to avoid this * case. */ - grpc_mdctx *mdctx = grpc_mdctx_create(); grpc_transport *transport = grpc_create_chttp2_transport( - exec_ctx, grpc_server_get_channel_args(server), tcp, mdctx, 0); - setup_transport(exec_ctx, server, transport, mdctx); + exec_ctx, grpc_server_get_channel_args(server), tcp, 0); + setup_transport(exec_ctx, server, transport); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); } @@ -107,9 +106,11 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { } for (i = 0; i < resolved->naddrs; i++) { - port_temp = grpc_tcp_server_add_port( + grpc_tcp_listener *listener; + listener = grpc_tcp_server_add_port( tcp, (struct sockaddr *)&resolved->addrs[i].addr, resolved->addrs[i].len); + port_temp = grpc_tcp_listener_get_port(listener); if (port_temp >= 0) { if (port_num == -1) { port_num = port_temp; |